MaxComputeでJobを設定する
本記事では、DataWorksのジョブ処理を使って、MaxCompute Tableで定期的なデータ格納処理をする方法について説明します。
前書き
MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。
少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
今回はDataWorksを使って、MaxComputeで定期的にデータを格納・処理するジョブを設定してみましょう。
- 基本モードと標準モードどちらでも良いですが、今回は標準モードとして説明しています。
- Table作成や本番環境へのコミットとか基本的なことは過去エントリにて説明していることや、今回説明する事項が多いので、基本的な操作部分を一部省略しています。
デプロイ後実施のサイクルジョブの設定
ここは5分おきにテーブルのインポート、テーブルのインサート処理を行いたいという例で実施します。
STEP1: ワークフローの作成
DataWorks WorkStdio画面から、新規でワークフローを作成します。
STEP2: ソーステーブルを作成し、ソースデータをインポートする
ワークフローの中身(Node)は現在空白状態と思うので、ソーステーブルを作成します。
上にある[ DDL Statement] からSQLでソーステーブルのフィールドを作ります。下段にある[Create Field]などのボタンで手動でフィールド作成もできます。
CREATE TABLE table_demo1 (shop_name string,customer_id string,total_price double,comments string,sale_date string,region string);
ソーステーブルは現在レコードらデータがない状態なので、データをインポートします。
インポート元のファイルのフィールドと、ソーステーブルのフィールドを合わせます。
display名を入力後、本番環境(production environment)へコミットします。
STEP3: ターゲットテーブルの作成
同じ要領で、今度はMaxComputeのメニューバーからCreate Tableでターゲットテーブルを作成します。
上にある[ DDL Statement] からSQLでターゲットテーブルのフィールドを作ります。下段にある[Create Field]などのボタンで手動でフィールド作成もできます。
CREATE TABLE result_demo1 (region string);
ターゲットテーブル作成後、コミットします。(標準モードのみ)
STEP4: 開始ノードの作成、SQLノードのインポート、SQLノードの挿入
ソーステーブル、ターゲットテーブルを作成したら、ジョブらワークフローを成立させるために、開始ノードやSQLノードなどを挿入します。まずは開始ノードを作成します。
開始ノード(Zero-Load Node)でノードの名前を「Start_R」としています。
次はテーブルのインポートを行うノードを作成します。ODPS SQLノードでノードの名前を「import_R」としています。
最後にテーブルのインサートを行うノードを作成します。ODPS SQLノードでノードの名前を「imsert_R」としています。
ここまで、ノードの設定が出来ていれば、以下の図のようになります。もしワークフローとしてリンク(図で灰色の矢印)が繋がっていない場合は新たに接続します。
STEP5: ノードプロパティの設定(5分ごとに繰り返し)
Start_Rノードにて、右側のPropertiesメニューバーから、ノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
続いて、import_R ノードで 以下のSQLクエリを入力します。このSQLクエリはジョブ実行時に実行されるSQL文です。
INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb63wv' , 'jd', 'hangzhou',101, '20190111', 'test' );INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb61wv' , 'jd', 'hangzhou',102, '20190111', 'test' );INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb13wv' , 'jd', 'hangzhou',103, '20190111', 'test' );INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb63wv' , 'jd', 'JP',101, '20190111', 'test' );
同じく、import_R ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
import_R ノードのノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこで出力先として「table_demo1」を追加します。 するとOutput Nodeリスト一覧で、出力先にtable_demo1が追加されてることがわかります。
設定が終わったらコミットします。
続いて、Insert_R ノードへ遷移し、 以下のSQLクエリを入力します。
INSERT OVERWRITE TABLE result_demo1SELECT region from table_demo1 where region='hangzhou';
同じく、Insert_R ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
Insert_R ノードのノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
画像の図のようにtable_demo1を親ノードの出力を入力として設定します。
設定が終わったらコミットします。
STEP6: タスクの実行
ここまで問題なく設定できていれば、今度はタスクを実行します。まずはDataWorks DataStdio画面の「Deploy」バーから「Create Deploy Task」画面へ遷移します。
タスク一覧が表示されてるので、対象のタスクを選定し、「Deploy Selected」でDeployタスクをセットします。
STEP7: サイクルインスタンスと実行ログのチェック
タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Instance」を選定します。
Import Nodeのタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
Import Nodeのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
すると、Import Nodeのタスクのサイクルログの詳細が確認できます。
今度は、Insert Nodeのタスク結果を確認してみます。
Insert Nodeのタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
Insert Nodeのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
Insert Nodeのタスクのサイクルログの詳細が確認できます。
STEP8: ターゲットテーブルをチェック
タスクらジョブは無事実行されたけど、それがターゲットテーブルにどのように反映しているか、正常性を含め確認します。DataMapから確認します。
これでデータがターゲットテーブルに正しく挿入されたことが確認できました。
条件分岐ジョブの設定仕方
ここは今日が月初めの日なら初日専用のshell、それ以外の日なら、初日以外のshellを実行したいという例で実施します。
STEP1: ワークフローの作成
DataWorks WorkStdio画面から、新規でワークフローを作成します。
STEP2: 割り当てノード、ブランチノード、およびシェルノードの作成
割り当てノード(Assignment Node)を作成します。「Assign_IfFirst」という名前にしています。
ブランチノード(Branch Node)を作成します。「Branch_judgeDownRun」という名前にしています。
Shellノード(Shell Node)を2つ作成します。「RunFirst」と「RunExceptFirst」という名前にしています。
ノード作成後、ワークフローで図のように矢印のコネクトを設定します。
ここの流れとしては、以下の通りになります。
- Assign_IfFirst 割り当てノード(Assignment Node)にて、Pythonスクリプトを作成して、今日が月の最初の日かどうかをチェックします
- Branch_judgeDownRun ブランチノード(Branch Node)にて、ノードはAssign_ifFirst割り当てノードからのパラメータを受け取ります
- 2の結果で、今日が初日の場合はRunFirstシェルノードを実行し、今日が初日でない場合はRunExceptFirstシェルノードを実行します。
STEP3: 構成割り当てノードにてPythonスクリプトを作成
Assign_IfFirst こと割り当てノード(Assignment Node)にて、Pythonスクリプトを作成します。
# encoding: utf-8from datetime import datetime as dtimeimport datetimedef firstDayOfMonth(dt):return (dt + datetime.timedelta(days=-dt.day + 1)).replace(hour=0, minute=0, second=0, microsecond=0)if firstDayOfMonth(dtime.today()).day == dtime.today().day:print(0) #first dayelse:print(1) #not first day
Assign_IfFirst ノードにて、右側のPropertiesメニューバーから、ノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
設定が終わったらコミットします。
STEP4: ブランチノードの設定
Branch_judgeDownRun ブランチノード(Branch Node)にて、引数パラメータからの条件に応じたOutput先を設定します。
Condition:${isFirst}==0
Associated Node OutputNode:_demo.fisrt_cond.is_first
Condition:${isFirst}==1
Associated Node OutputNode:_demo.fisrt_cond.not_first
Branch_judgeDownRun ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
Branch_judgeDownRun ノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にis_first、not_first が自動で追加されてると思います。
Branch_judgeDownRun ノードプロパティで日付を検証するパラメータを設定します。
STEP5: Shellノードの設定
Branch_judgeDownRun ノードの分岐処理結果をキャッチし、それぞれ処理するシェルノードを設定します。
まずはRunFirstシェルノードから設定します。 シェルノードを開いたら、以下コマンドを入力します。
echo "today is first day"
引き続き、RunFirstシェルノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
RunFirstシェルノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にis_firstを追加します。
設定が終わったらコミットします。
続いて、RunExceptFirstシェルノードを設定します。 シェルノードを開いたら、以下コマンドを入力します。
echo "today is not first day"
引き続き、RunExceptFirstシェルノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
RunExceptFirstシェルノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にRunExceptFirstを追加します。
設定が終わったらコミットします。
ここまで問題なく設定できていれば、今度はタスクを実行します。まずはDataWorks DataStdio画面の「Deploy」バーから「Create Deploy Task」画面へ遷移します。
タスク一覧が表示されてるので、対象のタスクを選定し、「Deploy Selected」でDeployタスクをセットします。
タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Task」を選定します。
タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Instance」を選定します。
まずはAssign_IfFirst 割り当てノードのサイクルインスタンスを確認します。
ここで図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
Assign_IfFirst 割り当てノードのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
すると、Assign_IfFirst 割り当てノードのタスクのサイクルログの詳細が確認できます。
次に、Branch_judgeDownRun ブランチノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
Branch_judgeDownRun ブランチノードのサイクルログの詳細はこの通りになります。
RunFirstシェルノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
ここも図のようなConfigがありますので、「More」をクリックします。
RunFirstシェルノードのサイクルログの詳細はこの通りになります。
結果として、今回今日の日付が月初めではないため、RunFirstシェルノードがスキップされました。
RunExceptFirstシェルノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
ここも図のようなConfigがありますので、「More」をクリックします。
RunExceptFirstシェルノードのサイクルログの詳細はこの通りになります。
最後に
本記事では、DataWorksのジョブ処理を使って、MaxCompute Tableで定期的なデータ格納処理をする方法を簡単に説明しました。