CSVファイルをMaxComputeの内部テーブルに格納する
本記事では、MaxComputeでCSVファイルを格納、SQL処理する方法を説明します。
前書き
MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。
少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
今回はAlibaba Cloud MaxComputeでCSVファイルを格納、SQL処理してみましょう。構成図で、こんな感じです。
今回はMovielensというオープンデータを使いました。
OSSはbucket名bigdata-dwh
、ディレクトリ名(Object Name Prefix)はCSV/
、CSV配下に以下のCSVファイルを格納しています。
共通作業(MaxCompute全体で共通事項)
RAM ユーザー作成&権限付与
もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。
RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。
対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。
Workspace作成
MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。
MaxComputeに関する必要な情報を設定し、Workspaceを作成します。
CSVファイルを MaxCompute Tableへ格納
ここのチュートリアルは、CSVファイルをMaxComputeの内部テーブルへ格納する方法です。
STEP1:データ格納
Workspace、Project作成直後は何もない状態と思います。
なので、まずはDataWorks のHomepageへ移動し、データを格納する準備を進めます。
DataWorksのコンソール画面です。まずは「Data Integration」をクリックします。
DataWorks DataIntegration画面に遷移します。今回、OSSにCSVファイルがあるため、それをソースとして設定するためにData Stores をクリックします。
「New Data Source」をクリックし、データソースを選定します。
今回はOSSがデータソースなので、OSSを選定します。
ここで、OSSに関する必要な情報入力画面が出ます。
End Pointはここから選定できます。今回は日本リージョンなので、https://oss-ap-northeast-1.aliyuncs.com
を入力します。リージョン情報はここから選定できます。
Bucket情報を入力します。OSS接続先が目的なのでBucket名だけで問題ないです。
AccessKey IDとAccessKey Secretを入力します。
色々入力が終わったら「Test connectivity」をクリックして、接続が出来ていることを確認します。接続が出来ていれば、Completeをクリックし、接続設定ウィザードを完了します。
その後はData Source にて、OSSが無事登録できていることを確認できたらOKです。
次はDataStdio画面へ遷移します。
DataStdio画面です。最初は何もない状態です。そこからプラレールのように色々構築することが出来ます。
STEP2:CSVファイルを認識し、MaxCompute Tableに格納
メニューバーからworkflowを作成します。
workflow名を入力し、「Create」ボタンで作成します。ここは「csv2maxcompute」というタイトルで登録します。
workflow画面が出たら、オレンジ色の「Batch Synchronization」をDrag & Drop で移動させます。
Drag &Drop後、Create Node作成ウィザードが表示されます。ここで新たに作成したいNodeの名前を入力します。
オレンジ色のNodeで名前を登録後、右クリックで「Open Node」を選定します。
このような編集画面が出てきます。
諸元となるデータソースを設定します。connectionタブで「OSS」を選定すると、先ほどDataIntegrationで登録した「csv_sample」が表示されます。
Object Name Prefixにてcsv/ratings.csv
を入力します。
今回のcsvファイルには1行目にフィールド名があるため、「Include Header」でYes
を選定します。
csvファイル自体圧縮していないので、Compression Formatは「None」と選定します。
「Preview」をクリックし、CSVファイルの構成を確認することができます。
ここで入力情報で間違ってなかったら「OK」ボタンをクリックすることで、フィールド一覧が自動認識されます。
今はターゲットソースの情報がないので、何も表示されていませんが、コンソール内部では認識されている状態になっています。
今度はTarget でソース設定情報を登録します。今回はMaxComputeなので、「ODPS」を選定します。ODPSはMaxComputeの昔の名称です。
今回新しくMaxCompute Projectを作成したため、当然データがない状態です。 なので、SQLを使ってデータを登録します。
CREATE TABLE IF NOT EXISTS ratings (userid INT,movieid INT,rating INT,tstamp STRING)COMMENT 'movielen ratings table'lifecycle 36500;
COMMENT は、名前通りコメントです。不要ならスキップ(空白)しても問題ないです。
lifecycle は、名前通り、tableのライフサイクルです。この数値(日)を経過したテーブルは自動削除されます。これをうまく使って一時テーブルの削除などストレージコストを節約できればと思います。
SQLでテーブル作成が無事完了すると、「02 Mappings」の画面でフィールド選定が出ます。ここで諸元ソースと、ターゲットで同じフィールドであるかMappingsの設定を合わせます。
これでデータ同期の設定完了です。メニューバーのところにある上書き保存のアイコンをクリックして上書き保存します。
参考として、ここにある「03 Channel」で数点補足します。
Expected Maximum Concurrency・・・最大同時実行数。ソースからターゲットへデータを転送するスレッド最大数です。
Bandwidth Throttling・・・帯域数。諸元データの容量次第では帯域を増やしたほうがベストです。
STEP3:実行
先ほどの上書き保存のアイコンの隣にある、実行ボタンをクリックします。以降、データ同期処理としてRuntiume Log画面で 長いLogが出ますが、このタスクが完了するのを待ちます。
ここでResource Groupについて説明します。
Resource GroupはDataWorksで今回のようなタスク処理に必要なリソースを選定することが出来ます。
Resource は共有リソースグループ(shared resource groups)、専用リソースグループ(exclusive resource groups)、カスタムリソースグループ(custom resource groups)の3種類があります。共有リソースグループは無料枠ですが、同リージョン配下で他のアカウント:DataWorksを使っているユーザリソースを含め処理されるため、少ないタスク・あるいは少数のノードが実行されるシナリオに向いています。処理を最大化したい場合は、有料となる専用リソースグループか、カスタムリソースグループを選定すると良いです。カスタムリソースグループはData IntegrationとShell Nodeのみに特化したリソースグループです。
STEP4:完了
先ほどのworkflowのTabをクリックし、画面を切り替えます。 workflowで、オレンジ色のDI(DataIntegration)で先ほど設定完了したので、今度は緑色のODPS SQL をDrag &DropでNodeを作成します。
緑色のところを右クリックで、Open Nodeを選定し、SQLクエリ編集画面を開きます。
レコード件数を集計するSQLクエリを作成し、上書き保存したら、実行ボタンをクリックしてSQLクエリを実行します。
SELECT Count(*) as record from ratings;
SQLクエリの実行コストらお金が出ます。(PAYGのみ)
実行結果としてレコード数が無事表示されました。これで以上です。
補足
上記 DataIntegratinとしての「実行」およびSQLクエリとしての「実行」。これらを連続で実行処理するのが面倒な場合は、workflowを使います。
workflow画面にて、Drag &Dropで処理順ごとにラインを作成します。
※注意として、MaxComputeは「DELETE」をサポートしません。なので、全てのレコードを削除したい場合は、DELETEの代わりにTRUNCATE TABLEを入れて全てのレコードを削除します。
TRUNCATE TABLE ratings;
ワークフロー、処理の順番が完成したら、Runボタンをクリックして実行します。
処理の過程はNodeを右クリック → View Logを選定するとLogが見れます。
今度はRun_SQL Node を右クリックし、View Logを選定すると、SQLクエリ結果が出力されます。
このようにして、DataIntegrationやSQL、Shell、Python、Job、レポート作成などなどの処理が多い場合はworkflowを使って、プラレールのように好きにワークフローを作成すると良いです。
最後に
本記事では、OSSにあるCSVファイルをMaxCompute Tableとして格納する方法を簡単に説明しました。
この作業はノーコード・ローコードであり非常にシンプルなので、MaxComputeに対してクイックスタートしやすいです。