TableStoreから連携する方法

作成日:2021-03-23

TableStoreからMaxComputeへ連携する方法

本記事では、TableStoreからMaxComputeへ連携する方法について説明します。

前書き

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

img

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

https://www.slideshare.net/sbcloud/alibaba-cloud-maxcompute

今回はAlibaba Cloud TableStore からMaxComputeへデータを連携します(フルデータ同期、増分データ同期、列/行モード)。構成図で、こんな感じです。

img



共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

img

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

img

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。

img

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

img

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

img

同期タスクをサブミットする方法(基本モード(basic mode))

作業の途中で 同期タスクをサブミットする旨のアクションが発生しますが、こちらの手順を参考にいただければ幸いです。    

DataWorks DataStdioで、操作が終わったら [Commit to Production Environment] をクリックし開発環境から本番環境へ直接コミットします。

img

同期タスクをサブミットする方法(標準モード(standard mode))

DataWorks DataStdioの右側にあるProperitiesをクリックします。

img

プロパティRerunを設定して、[Use Root Node]ボタンをクリックします。

img

開発環境にサブミットします。

img

あとは開発環境から本番環境にデプロイします。

Maxcomputeのデータをクエリする方法

DataWorks DataStdioのAd-Hoc Query画面に入って、[ODPS SQL]のノードを作成します。

img

SQLクエリ文を作成したら、上書き保存してから、Run SQLボタンを押します。
その後、SQLクエリの実行コストらお金が出ますが、ここも考慮のうえ、Run、で実行します。
実行結果としてレコード数が無事表示されます。

img



(事前準備)TableStore の準備

TableStoreのインスタンスを作成します。

img

TableStoreのテーブルを作成します。
img

Python SDKでTableStoreにデータを作成します。

from tablestore import *
import time
MachineIpList = ["7552_10.10.10.2", "7552_10.10.10.2", "8d9c_10.10.10.3", "8d9c_10.10.10.3", "e5a3_10.10.10.1"]
MetricsList = [{"cpu": "1", "net_in": "10.0"}, {"cpu": "2", "net_in": "11.0"}, {"cpu": "3", "net_in": "12.0"},{"cpu": "4", "net_in": "13.0"}, {"cpu": "5", "net_in": "14.0"}]
def batch_write_row(client):
# batch put 10 rows and update 10 rows on exist table, delete 10 rows on a not-exist table.
put_row_items = []
all_primary_key = []
for i in range(0, 5):
now = int(time.time_ns())
primary_key = [('MachineIp', MachineIpList[i]), ('Timestamp', now)]
attribute_columns = [('Metrics', str(MetricsList[i]))]
row = Row(primary_key, attribute_columns)
condition = Condition(RowExistenceExpectation.IGNORE)
item = PutRowItem(row, condition)
put_row_items.append(item)
all_primary_key.append((now, MachineIpList[i]))
time.sleep(1)
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem(table_name, put_row_items))
result = client.batch_write_row(request)
print('Result status: %s' % (result.is_all_succeed()))
print('check table\'s put results:')
succ, fail = result.get_put()
for item in succ:
print('Put succeed, consume %s write cu.' % item.consumed.write)
for item in fail:
print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
return all_primary_key
if __name__ == '__main__':
OTS_ENDPOINT = 'https://dw02.ap-northeast-1.ots.aliyuncs.com'
OTS_ID = 'LTA**************RKwQ' #実際のIDを入力
OTS_SECRET = 'eB6K54**********fVU' #実際のSecretを入力
OTS_INSTANCE = 'dw01'
table_name = 'Monitor'
client = OTSClient(OTS_ENDPOINT, OTS_ID, OTS_SECRET, OTS_INSTANCE)
time.sleep(3) # wait for table ready
primary_key_list = batch_write_row(client)
print('push data')

上記Python SDKで作成したデータを確認します。
img

今度はDataWorks側での作業に移ります。
TableStore をDataWorks DataIntegrationデータソースに追加します。

img

データソース追加をクリックして、TableStore を選択します。

img

データソースとしてTableStore の情報を入力し、接続テストを実行します。
img

接続テストで問題なければ、Completeボタンをクリックすることで、TableStore のデータソースが追加されます。
これでTableStore 側の設定は完了です。次はMaxCompute Tableの準備を進めます。

img

(事前準備)MaxCompute Tableの準備

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

img

Workspace Tables画面に入って、テーブルを作成します。

img

DDL Statementボタンをクリックして、OTS Tableに対応するDDL Statementを入力します。

CREATE TABLE IF NOT EXISTS ots_to_odps (
`MachineIp` string COMMENT '',
`ots_timestamp` BIGINT COMMENT '',
`Metrics` string COMMENT ''
);

img

Display Nameを入力し、テーブルをコミットします。その後はテーブルが作成されてることが確認できます。
※標準モードプロジェクトの場合は本番環境にもコミットする必要があります。

img

この準備が終わり次第、データを移行してみます。データ移行にはGUIモードとスクリプトモードの2つのパターンがあります。まずはGUIモードで移行します。
スクリプトモードはtemplateな扱いができるため、後日この作業の自動化したい場合、活用できればと思います。

TableStore - MaxCompute の同期(フルデータ、スクリプトモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

img

DataStdio画面にてWorkflowを作成します。

img

STEP2: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをOTS、ターゲットをODPSに選択します。
img

ターゲット・MaxComputeテーブルを選定します。
img

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
img

スクリプトが表示されますが、スクリプトを編集します。

{
"type": "job",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "ots_first",
"column": [
{
"name": "MachineIp"
},
{
"name": "Timestamp"
},
{
"name": "Metrics"
}
],
"range": {
"split": [
{
"type": "INF_MIN"
},
{
"type": "STRING",
"value": "7552_10.10.10.2"
},
{
"type": "STRING",
"value": "8d9c_10.10.10.3"
},
{
"type": "STRING",
"value": "e5a3_10.10.10.1"
},
{
"type": "INF_MAX"
}
],
"end": [
{
"type": "INF_MAX"
},
{
"type": "INF_MAX"
}
],
"begin": [
{
"type": "INF_MIN"
},
{
"type": "INF_MIN"
}
]
},
"table": "Monitor"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"datasource": "odps_first",
"column": [
"*"
],
"emptyAsNull": false,
"table": "ots_to_odps"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,
"throttle": false
}
}
}

img

STEP3: DI 同期タスクを実行

タスクを実行します。
img

タスクが成功すると、Logが表示されます。
img

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.ots_to_odps;

img

これにより、TableStore - MaxCompute の同期(フルデータ、スクリプトモード)が確認できました。

TableStoreに増分データを用意

今度はTableStoreで増分データ分だけ同期する方法を試します。準備としてTableStore側で増分データを用意します。
まずはTableStoreのStream機能を有効にします。
img

Python SDKでTableStoreにデータを追加で書き込みします。

from tablestore import *
import time
MachineIpList = ["7552_10.10.10.2", "7552_10.10.10.2", "8d9c_10.10.10.3", "8d9c_10.10.10.3", "e5a3_10.10.10.1"]
MetricsList = [{"cpu": "1", "net_in": "10.0"}, {"cpu": "2", "net_in": "11.0"}, {"cpu": "3", "net_in": "12.0"},{"cpu": "4", "net_in": "13.0"}, {"cpu": "5", "net_in": "14.0"}]
def batch_write_row(client):
# batch put 10 rows and update 10 rows on exist table, delete 10 rows on a not-exist table.
put_row_items = []
all_primary_key = []
for i in range(0, 5):
now = int(time.time_ns())
primary_key = [('MachineIp', MachineIpList[i]), ('Timestamp', now)]
attribute_columns = [('Metrics', str(MetricsList[i]))]
row = Row(primary_key, attribute_columns)
condition = Condition(RowExistenceExpectation.IGNORE)
item = PutRowItem(row, condition)
put_row_items.append(item)
all_primary_key.append((now, MachineIpList[i]))
time.sleep(1)
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem(table_name, put_row_items))
result = client.batch_write_row(request)
print('Result status: %s' % (result.is_all_succeed()))
print('check table\'s put results:')
succ, fail = result.get_put()
for item in succ:
print('Put succeed, consume %s write cu.' % item.consumed.write)
for item in fail:
print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
return all_primary_key
if __name__ == '__main__':
OTS_ENDPOINT = 'https://dw02.ap-northeast-1.ots.aliyuncs.com'
OTS_ID = 'LTA**************RKwQ' #実際のIDを入力
OTS_SECRET = 'eB6K54**********fVU' #実際のSecretを入力
OTS_INSTANCE = 'dw01'
table_name = 'Monitor'
client = OTSClient(OTS_ENDPOINT, OTS_ID, OTS_SECRET, OTS_INSTANCE)
time.sleep(3) # wait for table ready
primary_key_list = batch_write_row(client)
print('push data')

上記Python SDKで作成したデータを確認します。
img

この時点で、先ほどMaxComputeに格納したデータには含まれていないデータがあることがわかります。これを使って、増分データの同期を試してみます。

TableStoreをMaxCompute Tableへ同期(増分データ、GUIモード、列モード)

STEP1: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをOTS Stream、テーブルを選択します。
img

ターゲットをODPS、テーブル作成ボタンをクリックします。
img

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
img

STEP2: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
img

タスクが成功すると、Logが表示されます。
img

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.monitor_add_one where ds=20200913;

img

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、GUIモード、列モード)が確認できました。

TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、列モード)

STEP1: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをOTS Stream、テーブルを選択します。
img

ターゲットをODPS、テーブル作成ボタンをクリックします。
img

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
img

STEP2: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
img

するとスクリプトが表示されます。これは先述、GUIモードで選択した設定が自動でスクリプトに反映されます。
img

STEP3: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
img

タスクが成功すると、Logが表示されます。
img

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.monitor_add_one where ds=20200913;

img

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、列モード) が確認できました。

TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、行モード)

最後に、行モードで増分データの同期を試します。

STEP1: DI 同期タスクを作成

同期タスクを作成します。
img

ソースをOTS Stream、テーブルを選択します。
img

ターゲットをODPS、テーブル作成ボタンをクリックします。
img

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
img

STEP2: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
img

スクリプトが表示されますが、スクリプトを編集します。
modeとcolumnを編集します。

"parameter": {
"mode": "single_version_and_update_only",
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 30,
"isExportSequenceInfo": false,
"datasource": "ots_first",
"column": [
{
"name": "MachineIp"
},
{
"name": "Timestamp"
},
{
"name": "Metrics"
}
],
"startTimeString": "${startTime}",
"table": "Monitor",
"endTimeString": "${endTime}"
}

img

STEP3: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
img

タスクが成功すると、Logが表示されます。
img

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.ots_to_odps;

img

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、行モード) が確認できました。


最後に

本記事では、TableStoreからMaxComputeへ連携する方法を簡単に説明しました。
TableStoreでデータ量が肥大化した場合は、この方法でMaxComputeへデータ移植、コスト削減およびDWHとしての運用ができれば幸いです。

Hironobu Ohara
この記事を書いた人
Hironobu Ohara
Github Icon
2019年にAlibaba Cloudを担当。Databaseや収集、分散処理、ETL、検索、分析、機械学習基盤の構築、運用等を経て、現在分散系をメインとしたビッグデータとデータベースを得意・専門とするデータエンジニア。 AlibabaCloud MVP。
Close

Alibaba Cloudを始めてみましょう

ソフトバンクは、Alibaba Cloudのアカウント開設から、サービス展開までをお手伝いします。
Hatena