Apache SparkからHologresへデータ連携方法
本記事では、Hologres を使って、Apache SparkからHologresへリアルタイムデータ連携する方法をご紹介します。
Hologresとは
Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。
少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
Apache Sparkとは
Apache Sparkは、大規模データ処理のための統合分析エンジンです。複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションで人気があります。
また、今回のチュートリアルとしては、Alibaba Cloud EMR(E-MapReduce) によるApache sparkを使用します。EMR(E-MapReduce) については、こちらのSlideShareで紹介しています。
https://www.slideshare.net/sbcloud/alibabacloudemapreduce-231725148
Apache SparkからHologresへデータ連携について
このガイドラインでは、Sparkを使ってHologresでデータ処理を行う方法を順を追って説明します。
このチュートリアルについて
対象者:
本ガイドラインは、以下のような方を対象としています
- Alibaba Cloud、Hologres、LogService、OSS(Object Storage Service)、EMR(E-MapReduce) に関する基本的な知識を持っている
前提条件:
- Alibaba Cloud のアカウントを所持している
- Hologres、LogService、OSS(Object Storage Service)EMR(E-MapReduce) が使用可能な状態になっている
- 使用するHologres、LogService、OSS(Object Storage Service)EMR(E-MapReduce) は同一Region配下にある
- 少なくとも1つのHologresインスタンス、1つのOSS(Object Storage Service)bucketを持っている
EMR(E-MapReduce)クラスタをspark実行環境を準備
まず最初に、sparkの実行環境を準備します。Apache Sparkを導入するにはいくつかの方法 があります。
http://spark.apache.org/docs/latest/index.html#launching-on-a-cluster
Alibaba Cloud EMR(E-MapReduce)では、Hadoop、Spark、Flink、Kafka、HBaseなどのオープンソースのビッグデータサービスを数分で簡単に導入することができます。このガイドラインでは、すべてのSparkタスクをEMR(E-MapReduce)クラスタ上で実行します。
EMRのウィザードを使って、EMR-3.36.1とspark 2.4.7でEMR(E-MapReduce)クラスタを作成します。Alibaba CloudのEMRはAWS EMRなど他社EMRとは異なってほぼマネージドサービスなので、コンソール上の操作だけで5分で出来ます。
EMR(E-MapReduce)コンソールのData Platformにプロジェクトを作成し、タスクを管理します。
Hologresテーブルの準備
既存のHologresインスタンスに、関連データを格納するためのテーブルを作成します。
以下は生成するDDL文です。
BEGIN;CREATE TABLE public.book_order ("order_id" text NOT NULL,"user_id" int8 NOT NULL,"book_id" int8 NOT NULL,"book_name" text NOT NULL,"order_cnt" int8 NOT NULL,"order_amt" int8 NOT NULL,PRIMARY KEY (order_id));CALL SET_TABLE_PROPERTY('public.book_order', 'orientation', 'column');CALL SET_TABLE_PROPERTY('public.book_order', 'bitmap_columns', 'order_id,book_name');CALL SET_TABLE_PROPERTY('public.book_order', 'dictionary_encoding_columns', 'order_id:auto,book_name:auto');CALL SET_TABLE_PROPERTY('public.book_order', 'time_to_live_in_seconds', '3153600000');CALL SET_TABLE_PROPERTY('public.book_order', 'distribution_key', 'order_id');CALL SET_TABLE_PROPERTY('public.book_order', 'storage_format', 'orc');COMMIT;
OSS(Object Storage Service)バケツにスクリプトとデータのフォルダを用意する
既存のOSS(Object Storage Service)バケットに、pythonスクリプトやデータファイルを保存するためのフォルダを新規に作成します。
Apache SparkでCSVデータファイルをHologresテーブルに取り込む
前述の通り、sparkは複数のプログラム言語をサポートしていますが、ここではpythonとPySparkを例にしています。タスクが完了すると、OSS(Object Storage Service)バケット内のCSVファイルを選択し、その中の全てのレコードが対象のHologresテーブルに保存されます。
DataFrameReaderのcsv("path")
またはformat("csv").load("path")
を使うと、CSVファイルをPySparkのDataFrameに読み込むことができます。また、JdbcRDDを使えば、特定のドライバを使って簡単にデータベースにDataFrameを書き込むことができます。
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/JdbcRDD.html
この状態ではHologresインスタンスはPostgreSQLとして接続されています。詳しい情報は、spark JDBC data sourceやspark generic file sourceを参照してください。
http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
以下のスクリプトでhologres_spark.py
を作成します。
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, IntegerType, StringTypeif __name__ == '__main__':spark = SparkSession.builder.appName('HologresSparkDemo').getOrCreate()schema = StructType() \.add("order_id", StringType(), True) \.add("user_id", IntegerType(), True) \.add("book_id", IntegerType(), True) \.add("book_name", StringType(), True) \.add("order_cnt", IntegerType(), True) \.add("order_amt", IntegerType(), True)df_with_schema = spark.read.format("csv") \.option("header", True) \.schema(schema) \.load("<your file path in OSS bucket>")df_with_schema.write.mode("append").format("jdbc") \.option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \.option("dbtable", "<your target table name>") \.option("user", "<your accessKeyId>") \.option("password", "<your accessKeySecret>") \.option("driver", "org.postgresql.Driver") \.save()
以下のスクリプトを使って、関連するデータファイルを生成し、テストすることができます。
import csvimport randomimport uuidoutput_date = "20210712"book_info = ["嫌われる勇気", "ノルウェイの森", "海辺のカフカ", "色彩を持たない多崎つくると彼の巡礼の年","容疑者Ⅹの献身", "人間失格", "こころ", "天声人语", "幸せになる勇気"]book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]csv_writer = csv.writer(open("test_data_{0}.csv".format(output_date), "w+", newline='', encoding='UTF-8'))for i in range(10000):book_id = random.randint(1, 9)order_count = random.randint(1, 5)row = [uuid.uuid1(), random.randint(1, 50), book_id, book_info[book_id-1], order_count,book_price[book_id-1] * order_count] # order_id, user_id, book_id, book_name, order_count, order_amountcsv_writer.writerow(row)
PostgreSQL JDBC Driverをダウンロードして、pythonスクリプト、生成されたデータファイル、ドライバパッケージを上記の用意したフォルダにアップロードします。
https://mvnrepository.com/artifact/org.postgresql/postgresql
EMR(E-MapReduce)プロジェクトでApache Spark Jobを作成します。
以下のフォーマットに従って、実行コマンドを入力します。
--driver-class-path <your PostgreSQL driver class path in OSS> --jars <your PostgreSQL driver class path in OSS> <your python scripts path in OSS>
ウィザードを使ってOSS(Object Storage Service)のパスを生成するには、Enter an OSS pathリンクをクリックします。
自作のSpark環境で作業する場合は、代わりに以下のコマンドを使用してください。
spark-submit --driver-class-path <your PostgreSQL driver class path> --jars <your PostgreSQL driver class path> <your python scripts path>
Apache spark job を保存し、EMR(E-MapReduce)クラスタに投入して実行します。
Apache Spark Jobのインスタンス情報やログは、詳細ページで確認できます。
HoloWebにアクセスして、ターゲットテーブルのデータを照会すると、CSVファイルのすべてのレコードがHologresテーブルに格納されているのがわかると思います。
Spark streamingでLog ServiceからHologresへリアルタイムデータ転送
ここまできたら、Spark streaming動作確認のためにHologresのテーブルデータをDELETE文でクリアします。
LogServiceプロジェクトとlogstoreを準備します。logstore作成時にWebTrackingが有効になっていることを確認してください。
logstore のデータページに入り、インデックス属性を有効にします。
Alibaba Cloudが提供するSpark SDK](https://www.alibabacloud.com/help/doc-detail/51075.htm)では、ReceiverまたはDirectモードでLog ServiceからLog dataを消費することができます。
関連する紹介やデモコードはGit aliyun-emapreduce-datasourcesやGit aliyun-emapreduce-demoで入手できます。
Apache Spark SDKにより、DataStreamReaderがログサービスを loghub
というフォーマットで認識するようになりました。Pythonの使い方についてはhttps://github.com/aliyun/aliyun-emapreduce-datasources/blob/main/docs/how_to_run_spark_with_python_sdk.mdを参照してください。
関連するSDKを使用して、まずテストのためにコンソールにストリーミングデータを表示します。以下のスクリプトで hologres_spark_streaming.py
を作成します。
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, IntegerType, StringTypeif __name__ == '__main__':spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()schema = StructType() \.add("order_id", StringType(), True) \.add("user_id", IntegerType(), True) \.add("book_id", IntegerType(), True) \.add("book_name", StringType(), True) \.add("order_cnt", IntegerType(), True) \.add("order_amt", IntegerType(), True)line_with_schema = spark.readStream.format("loghub") \.schema(schema) \.option("sls.project", "<your log service project name>") \.option("sls.store", "<your log service logstore name>") \.option("access.key.id", "<your accessKeyId>") \.option("access.key.secret", "<your accessKeySecret>") \.option("endpoint", "<your endpoint such as ap-northeast-1-intranet.log.aliyuncs.com>") \.option("startingoffsets", "latest") \.load()stream = line_with_schema.writeStream.format("console") \.outputMode("append") \.trigger(processingTime='25 seconds') \.start()stream.awaitTermination()
Pythonスクリプト、PostgreSQLドライバ、log service spark sdk (emr-logservice_2.11-2.2 .0.jar) 、およびfastjson-1.2 .45.jar、commons-validator-1.4 .0.jar、ezmorf-1.0 .6.jar、loghub-client-lib-0.6 .13.jar、aliyun-log-0.6 .10.jar、json-lib-2.4-jdk 15.jar、zkclient-0.10.jar、emr-common_2.11-2.2 .0.jarをアップロードします。
EMR(E-MapReduce)プロジェクトの下に、新しいspark streaming jobを作成し、ジョブ実行コマンドを更新します。
OSS(Object Storage Service)のパスを指定して、以下のフォーマットに沿って実行コマンドを入力してください。
--master yarn-client --driver-memory 7G --executor-memory 5G --executor-cores 1 --num-executors 32 --driver-class-path ossref://xxx/postgresql-42.2.6.jar --jars ossref://xxx/postgresql-42.2.6.jar,ossref://xxx/fastjson-1.2.45.jar,ossref://xxx/commons-validator-1.4.0.jar,ossref://xxx/ezmorph-1.0.6.jar,ossref://xxx/emr-logservice_2.11-2.2.0.jar,ossref://xxx/loghub-client-lib-0.6.13.jar,ossref://xxx/aliyun-log-0.6.10.jar,ossref://xxx/json-lib-2.4-jdk15.jar,ossref://xxx/zkclient-0.10.jar,ossref://xxx/emr-common_2.11-2.2.0.jar <your python scripts path in OSS>
自分で構築したspark環境で作業している場合は、関連ファイルのパスも更新してください。
sparkのジョブを保存し、EMR(E-MapReduce)クラスタに投入して実行します。
spark streamingジョブの実行中に、以下のスクリプトで新しいログサービスレコードを送信し、関連するログを確認します。
import randomimport requestsimport uuidproject = "<your log service project name>"endpoint = "<your endpoint such as ap-northeast-1.log.aliyuncs.com>"logstore = "<your log service logstore name>"topic = "<your logs topic>"# urlencoded book name in Japanesebook_info = ["%E5%AB%8C%E3%82%8F%E3%82%8C%E3%82%8B%E5%8B%87%E6%B0%97","%E3%83%8E%E3%83%AB%E3%82%A6%E3%82%A7%E3%82%A4%E3%81%AE%E6%A3%AE","%E6%B5%B7%E8%BE%BA%E3%81%AE%E3%82%AB%E3%83%95%E3%82%AB","%E8%89%B2%E5%BD%A9%E3%82%92%E6%8C%81%E3%81%9F%E3%81%AA%E3%81%84%E5%A4%9A%E5%B4%8E%E3%81%A4%E3%81%8F%E3%82%8B%E3%81%A8%E5%BD%BC%E3%81%AE%E5%B7%A1%E7%A4%BC%E3%81%AE%E5%B9%B4","%E5%AE%B9%E7%96%91%E8%80%85%E2%85%A9%E3%81%AE%E7%8C%AE%E8%BA%AB", "%E4%BA%BA%E9%96%93%E5%A4%B1%E6%A0%BC","%E3%81%93%E3%81%93%E3%82%8D", "%E5%A4%A9%E5%A3%B0%E4%BA%BA%E8%AF%AD","%E5%B9%B8%E3%81%9B%E3%81%AB%E3%81%AA%E3%82%8B%E5%8B%87%E6%B0%97"]book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&order_id={4}&user_id={5}&book_id={6}&book_name={7}&order_cnt={8}&order_amt={9}'for i in range(5):book_id = random.randint(1, 9)order_count = random.randint(1, 5)res = requests.get(url.format(project, endpoint, logstore, topic, uuid.uuid1(), random.randint(1, 50), book_id,book_info[book_id - 1], order_count,book_price[book_id - 1] * order_count))print(res)
ここでストリームデータをターゲットのHologresテーブルに保存するために、formatconsole
をjdbc
に置き換えると、java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
という関連エラーが発生します。つまり、データソースをjdbcと設定では、ストリーミング処理をサポートすることができないことがわかります。
......# Error Data source jdbc does not support streamed writingstream = line_with_schema.writeStream.format("jdbc") \.outputMode("append") \.option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \.option("dbtable", "<your target table name>") \.option("user", "<your accessKeyId>") \.option("password", "<your accessKeySecret>") \.option("driver", "org.postgresql.Driver") \.start()......
foreachBatch(...) でフォーマットコンソールを更新すると、ストリーミングクエリの各マイクロバッチの出力データに対して実行する関数を指定できます。Sparkジョブで以前行ったように、DataFrameをHologresテーブルに保存します。
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, IntegerType, StringTypedef foreach_batch_function(df, epoch_id):df.write.mode("append").format("jdbc") \.option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \.option("dbtable", "<your target table name>") \.option("user", "<your accessKeyId>") \.option("password", "<your accessKeySecret>") \.option("driver", "org.postgresql.Driver") \.save()passif __name__ == '__main__':spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()schema = StructType() \.add("order_id", StringType(), True) \.add("user_id", IntegerType(), True) \.add("book_id", IntegerType(), True) \.add("book_name", StringType(), True) \.add("order_cnt", IntegerType(), True) \.add("order_amt", IntegerType(), True)line_with_schema = spark.readStream.format("loghub") \.schema(schema) \.option("sls.project", "<your log service project name>") \.option("sls.store", "<your log service logstore name>") \.option("access.key.id", "<your accessKeyId>") \.option("access.key.secret", "<your accessKeySecret>") \.option("endpoint", "<your endpoint such as ap-northeast-1-intranet.log.aliyuncs.com>") \.option("startingoffsets", "latest") \.load()stream = line_with_schema.writeStream.foreachBatch(foreach_batch_function).start()# stream = line_with_schema.writeStream.format("console") \# .outputMode("append") \# .trigger(processingTime='25 seconds') \# .start()stream.awaitTermination()
新しいPythonスクリプトをアップロードし、sparkストリーミングジョブを再実行します。新しいログサービステストレコードを送信し、Hologresテーブルを確認します。
最後に
ここまで、Apache SparkからHologresへリアルタイムデータ連携する方法を紹介しました。
この方法を生かすことで、Apache Sparkがあるサービス基盤からリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。