Elasticsearchから連携する方法

作成日:2021-03-22

ElasticsearchからMaxComputeへ連携する方法

本記事では、Elasticsearch からMaxComputeへ連携する方法について説明します。

前書き

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

img

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

https://www.slideshare.net/sbcloud/alibaba-cloud-maxcompute

今回はMaxComputeからAlibaba Cloud Elasticsearchへデータを連携します。構成図で、こんな感じです。

img



共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

img

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

img

今回は、RDSからMaxComputeへ連携する方法なので、AliyunRDSFullAccessもアタッチします。

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。

img

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

img

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

img

同期タスクをサブミットする方法(基本モード(basic mode))

作業の途中で 同期タスクをサブミットする旨のアクションが発生しますが、こちらの手順を参考にいただければ幸いです。    

DataWorks DataStdioで、操作が終わったら [Commit to Production Environment] をクリックし開発環境から本番環境へ直接コミットします。

img

同期タスクをサブミットする方法(標準モード(standard mode))

DataWorks DataStdioの右側にあるProperitiesをクリックします。

img

プロパティRerunを設定して、[Use Root Node]ボタンをクリックします。

img

開発環境にサブミットします。

img

あとは開発環境から本番環境にデプロイします。

Maxcomputeのデータをクエリする方法

DataWorks DataStdioのAd-Hoc Query画面に入って、[ODPS SQL]のノードを作成します。

img

SQLクエリ文を作成したら、上書き保存してから、Run SQLボタンを押します。
その後、SQLクエリの実行コストらお金が出ますが、ここも考慮のうえ、Run、で実行します。
実行結果としてレコード数が無事表示されます。

img



(事前準備)MaxCompute Tableの準備

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

img

Workspace Tables画面に入って、テーブルを作成します。

img

DDL Statementボタンをクリックして、MySQL Tableに対応するDDL Statementを入力します。

CREATE TABLE IF NOT EXISTS odps_to_es (
`create_time` string COMMENT '',
`category` string COMMENT '',
`brand` string COMMENT '',
`buyer_id` string COMMENT '',
`trans_num` bigint COMMENT '',
`trans_amount` double COMMENT '',
`click_cnt` bigint COMMENT ''
)
PARTITIONED BY (pt bigint);

img

Display Nameを入力し、テーブルをコミットします。その後はテーブルが作成されてることが確認できます。
※標準モードプロジェクトの場合は本番環境にもコミットする必要があります。

img

このテーブルはまだ何も入っていない状態なので、データを追加します。インポートボタンをクリックします。

img

パーティションが存在するかどうかを確認します。

img

txtファイルをアップロードします。

img

データのインポートが無事成功したら、Ad-Hocクエリなり、DataMapなりでレコードが確認できます。

img

これでMaxCompute側の設定は完了です。次はElasticsearchの準備を進めます。

(事前準備)Elasticsearchの準備

Elasticsearchコンソールに入って、クラスターを作成してから、詳細画面に入ります。
img

Auto Indexingを有効に設定します。
img

Auto Indexingが有効になると、このようなステータスになります。
img

Public Networksを有効にして、EndPointをメモします。
img

Public Networksのホワイトリストを変更します。
img

DataworksのデフォルトリソースグループのIPをPublic Networksアクセスのホワイトリストに追加します。
東京リジョンなら以下の通りです。

100.105.55.0/24,11.192.147.0/24,11.192.148.0/24,11.192.149.0/24,100.64.0.0/10,47.91.12.0/24,47.91.13.0/24,47.91.9.0/24,11.199.250.0/24,47.91.27.0/24,11.59.59.0/24,47.245.51.128/26,47.245.51.192/26,47.91.0.128/26,47.91.0.192/26

img

IPがホワイトリストに追加されたらOKです。
img

Kibanaのコンソールに入ります。
img

Kibanaにログインし、DevToolからIndexを作成します。

PUT odps_index?include_type_name=false
{
"mappings": {
"properties": {
"category": {
"type": "text"
},
"brand": {
"type": "text"
},
"buyer_id": {
"type": "text"
},
"trans_num": {
"type": "integer"
},
"trans_amount": {
"type": "double"
},
"click_cnt": {
"type": "integer"
}
}
}
}

img

DataWorks DataIntegrationに切り替えて、、データソースにElasticsearchを追加します。
img

img

データソースとしてElasticsearchのPublic Endpoint、ユーザー名、パスワードなどの情報を入力し、接続テストを実行します。

img

接続テストで問題なければ、Completeボタンをクリックすることで、Elasticsearchのデータソースが追加されます。

img

この準備が終わり次第、データを同期してみます。データ同期にはGUIモードとスクリプトモードの2つのパターンがあります。まずはGUIモードで同期します。
スクリプトモードはtemplateな扱いができるため、後日この作業の自動化したい場合、活用できればと思います。

MaxCompute TableをElasticsearchへ移行(GUIモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

img

DataStdio画面にてWorkflowを作成します。

img

STEP2: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
img

Previewでデータが表示されます。
img

ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
img

Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。

img

odpsのデータに対応するフィールドを入力します。

{"name":"create_time","type":“id"}
{"name": "category","type": "text"}
{"name": "brand","type": "text"}
{"name": "buyer_id","type": "text"}
{"name": "trans_num","type": "integer"}
{"name": "trans_amount","type": "double"}
{"name": "click_cnt","type": "integer"}

img

STEP3: DI 同期タスクを実行

タスクを保存して、実行します。
img

タスクが成功すると、Logが表示されます。
img

今度はElasticsearch - kibana画面に遷移し、データが届いてるかを確認します。
KibanaコンソールのDevToolより、データを検索します。

POST /odps_index/_search?pretty
{
"query": { "match_all": {} }
}

img

その結果、GUIモードでMaxComputeのデータをElasticsearchへ同期したことが確認できました。

img

MaxCompute TableをElasticsearchへ移行(スクリプトモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

img

DataStdio画面にてWorkflowを作成します。

img

STEP2: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
img

Previewでデータが表示されます。
img

ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
img

Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。

img

odpsのデータに対応するフィールドを入力します。

{"name":"create_time","type":“id"}
{"name": "category","type": "text"}
{"name": "brand","type": "text"}
{"name": "buyer_id","type": "text"}
{"name": "trans_num","type": "integer"}
{"name": "trans_amount","type": "double"}
{"name": "click_cnt","type": "integer"}

img

STEP3: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。

img

するとスクリプトが表示されます。これは先述、GUIモードで選択した設定が自動でスクリプトに反映されます。
img

STEP4: DI 同期タスクを実行

スクリプト(タスク)を実行します。 スクリプト(タスク)が成功すると、タスクとしてLogが表示されます。
img

あとは上記通り、Elasticsearch - kibanaでcheck、可視化できます。


最後に

本記事では、MaxComputeからElasticsearchへ連携する方法を簡単に説明しました。
MaxComputeのデータをElasticsearch - kibanaダッシュボードでリアルタイム可視化したい場合、参考に頂ければ幸いです。

Hironobu Ohara
この記事を書いた人
Hironobu Ohara
Github Icon
2019年にAlibaba Cloudを担当。Databaseや収集、分散処理、ETL、検索、分析、機械学習基盤の構築、運用等を経て、現在分散系をメインとしたビッグデータとデータベースを得意・専門とするデータエンジニア。 AlibabaCloud MVP。
Close

Alibaba Cloudを始めてみましょう

ソフトバンクは、Alibaba Cloudのアカウント開設から、サービス展開までをお手伝いします。
Hatena