Apache Spark on k8sからHologresへデータ連携方法
本記事では、Hologres を使って、AzureのKubernetesサービスであるAKSからApache Sparkによるマルチクラウドーリアルタイムデータ連携する方法をご紹介します。
Hologresとは
Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。
少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
Apache Sparkとは
Apache Sparkは、大規模データ処理のための統合分析エンジンです。複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションで人気があります。
Kubernetes(k8s)とは
Kubernetes は、K8sとも呼ばれ、コンテナ化されたアプリケーションのデプロイ、スケーリング、管理を自動化するためのオープンソースのシステムで、アプリケーションを構成するコンテナを論理的な単位にまとめ、管理と発見を容易にします。
Azure Kubernetes上でApache Sparkを使ってHologresでデータ処理をする
このガイドラインでは、Kubernetes上のApache Sparkを使って、Hologresによるデータ処理を段階的に作成します。
マルチクラウドによるデータ分析サービスを構築するため、Apache SparkはAKS(Azure Kubernetes Service) にてデプロイします。
このチュートリアルについて
対象者:
本ガイドラインは、以下のような方を対象としています
- docker、Kubernetes,、Alibaba Cloud、Hologres,、Azure、AKS(Azure Kubernetes Service) 、Azure Storage Accountに関する基本的な知識を持っている
前提条件:
- Alibaba Cloud、Azure のアカウントを所持している
- Alibaba Cloud Hologresが使用可能な状態になっている
- 少なくとも1つのHologresインスタンスを持っている
- Azure AKS(Azure Kubernetes Service) と Azure Storage Account が使用可能な状態になっている
- 作業環境にAzure CLI、およびdocker(17.05以降)をインストールしている
Hologresテーブルの準備
既存のHologresインスタンスに、関連データを格納するためのテーブルを作成します。
以下は生成するDDL文です
BEGIN;CREATE TABLE public.book_order ("order_id" text NOT NULL,"user_id" int8 NOT NULL,"book_id" int8 NOT NULL,"book_name" text NOT NULL,"order_cnt" int8 NOT NULL,"order_amt" int8 NOT NULL,PRIMARY KEY (order_id));CALL SET_TABLE_PROPERTY('public.book_order', 'orientation', 'column');CALL SET_TABLE_PROPERTY('public.book_order', 'bitmap_columns', 'order_id,book_name');CALL SET_TABLE_PROPERTY('public.book_order', 'dictionary_encoding_columns', 'order_id:auto,book_name:auto');CALL SET_TABLE_PROPERTY('public.book_order', 'time_to_live_in_seconds', '3153600000');CALL SET_TABLE_PROPERTY('public.book_order', 'distribution_key', 'order_id');CALL SET_TABLE_PROPERTY('public.book_order', 'storage_format', 'orc');COMMIT;
特定データファイルおよびjarファイルを含むApache SparkのImageファイルのbuild
AKS(Azure Kubernetes Service) にApache Sparkをデプロイするためには、Sparkの実行環境、python 3環境下のPySpark、ソースデータファイル、依存関係にあるJARを含む、適切なSparkのImageを構築する必要があります。
以下の手順で独自のイメージを構築することもできますし、あるいはDocker Image libraryにある bwbw723/spark-k8s-hologres-demo:2.4.7
をそのまま使うこともできます。
上記のパブリックイメージを使用する場合は、次のセクションから始めてください。独自のイメージを構築するには、作業環境でdockerサービスが実行されていることを確認する必要があります。
Apache Spark用Imageファイルの準備
Alibaba Cloud Log Service SDK は spark 2.x.x を必要とするので、ここでは spark-2.4.7-bin-hadoop2.7
を使用します。
$ wget https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz$ tar -xzvf spark-2.4.7-bin-hadoop2.7.tgz
ImageのJAR依存関係とデータファイルの準備
上記のHologresテーブルのテーブルスキーマに従って、以下のスクリプトで関連するデータファイルを生成してテストすることができます。
import csvimport randomimport uuidoutput_date = "20210712"book_info = ["嫌われる勇気", "ノルウェイの森", "海辺のカフカ", "色彩を持たない多崎つくると彼の巡礼の年","容疑者Ⅹの献身", "人間失格", "こころ", "天声人语", "幸せになる勇気"]book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]csv_writer = csv.writer(open("test_data_{0}.csv".format(output_date), "w+", newline='', encoding='UTF-8'))for i in range(10000):book_id = random.randint(1, 9)order_count = random.randint(1, 5)row = [uuid.uuid1(), random.randint(1, 50), book_id, book_info[book_id-1], order_count,book_price[book_id-1] * order_count] # order_id, user_id, book_id, book_name, order_count, order_amountcsv_writer.writerow(row)
以下のスクリプトを使って、関連するデータファイルを生成し、テストすることができます。
order_id,user_id,book_id,book_name,order_cnt,order_amt5761158b-f044-11eb-8be8-706655fcbfda,39,8,天声人语,1,55057613ca0-f044-11eb-a396-706655fcbfda,46,8,天声人语,5,275057613ca1-f044-11eb-82e5-706655fcbfda,13,5,容疑者Ⅹの献身,1,20057613ca2-f044-11eb-b395-706655fcbfda,35,5,容疑者Ⅹの献身,3,60057613ca3-f044-11eb-a123-706655fcbfda,42,4,色彩を持たない多崎つくると彼の巡礼の年,1,30057613ca4-f044-11eb-923c-706655fcbfda,18,8,天声人语,5,275057613ca5-f044-11eb-a533-706655fcbfda,29,9,幸せになる勇気,3,195057613ca6-f044-11eb-9ca4-706655fcbfda,35,7,こころ,3,105057613ca7-f044-11eb-bd76-706655fcbfda,25,1,嫌われる勇気,1,500......
PostgreSQL JDBC Driver の依存関係にあるJARをMaven Repository からダウンロードします。
https://mvnrepository.com/artifact/org.postgresql/postgresql
spark root folder(spark-2.4.7-bin-hadoop2.7 here) と入力し、demo
という名前の新しいフォルダ を作成し、準備したデータファイル (data.csv) 、PostgreSQLドライバ (postgresql-42.2 .6.jar) 、ログサービスのspark sdk (emr-logservice_2.11-2.2 .0.jar) とその依存関係 (fastjson-1.2 .45.jar、commons-validator-1.4 .0.jar、ezmorf-1.0 .6.jar、loghub-client-lib-0.6 .13.jar、aliyun-log-0.6 .10.jar、json-lib-2.4-jdk 15.jar、zkclient-0.10.jar、emr-common_2.11-2.2 .0.jarなど) を入れます。
Apache spark imageをビルドおよびプッシュ
以下の2つの点について /kubernetes/dockerfiles/spark/Dockerfile
を更新します。
- python 3実行環境を追加します。
- データファイルと依存関係JARを作業ディレクトリに置く
以下はDockerfileのサンプルです。
FROM openjdk:8-jdk-slimARG spark_jars=jarsARG img_path=kubernetes/dockerfilesARG k8s_tests=kubernetes/tests# Before building the docker image, first build and make a Spark distribution following# the instructions in http://spark.apache.org/docs/latest/building-spark.html.# If this docker file is being used in the context of building your images from a Spark# distribution, the docker build command should be invoked from the top level directory# of the Spark distribution. E.g.:# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .RUN set -ex && \apt-get update && \ln -s /lib /lib64 && \apt install -y bash tini libc6 libpam-modules libnss3 && \mkdir -p /opt/spark && \mkdir -p /opt/spark/work-dir && \touch /opt/spark/RELEASE && \rm /bin/sh && \ln -sv /bin/bash /bin/sh && \echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \rm -rf /var/cache/apt/*COPY ${spark_jars} /opt/spark/jarsCOPY bin /opt/spark/binCOPY sbin /opt/spark/sbinCOPY ${img_path}/spark/entrypoint.sh /opt/COPY examples /opt/spark/examplesCOPY ${k8s_tests} /opt/spark/testsCOPY data /opt/spark/dataCOPY python /opt/spark/pythonENV SPARK_HOME /opt/spark# Add python3 running environmentENV PYTHON_VERSION=3.7.3RUN apt-get install -y \build-essential \libreadline-gplv2-dev \libncursesw5-dev \libssl-dev \libsqlite3-dev \tk-dev \libgdbm-dev \libc6-dev \libbz2-dev \libffi-dev \zlib1g-dev \liblzma-dev \wget \&& apt-get autoremove -y \&& apt-get cleanWORKDIR /usr/srcRUN wget https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgzRUN tar xzf Python-$PYTHON_VERSION.tgzWORKDIR /usr/src/Python-$PYTHON_VERSIONRUN ./configure --enable-optimizations --with-ensurepip=installRUN make altinstallRUN ln -s /usr/local/bin/python3.7 /usr/local/bin/python3RUN ln -s /usr/local/bin/python3.7 /usr/local/bin/pythonRUN ln -s /usr/local/bin/pip3.7 /usr/local/bin/pip3RUN ln -s /usr/local/bin/pip3.7 /usr/local/bin/pipRUN pip install findsparkWORKDIR /opt/spark/work-dirRUN rm -rf /usr/src/Python-$PYTHON_VERSIONRUN rm -rf /usr/src/Python-$PYTHON_VERSION.tgz# Put data file and dependency JARs into work dirADD demo/aliyun-log-0.6.10.jar aliyun-log-0.6.10.jarADD demo/fastjson-1.2.45.jar fastjson-1.2.45.jarADD demo/loghub-client-lib-0.6.13.jar loghub-client-lib-0.6.13.jarADD demo/ezmorph-1.0.6.jar ezmorph-1.0.6.jarADD demo/emr-common_2.11-2.2.0.jar emr-common_2.11-2.2.0.jarADD demo/emr-logservice_2.11-2.2.0.jar emr-logservice_2.11-2.2.0.jarADD demo/commons-validator-1.4.0.jar commons-validator-1.4.0.jarADD demo/postgresql-42.2.6.jar postgresql-42.2.6.jarADD demo/zkclient-0.10.jar zkclient-0.10.jarADD demo/json-lib-2.4-jdk15.jar json-lib-2.4-jdk15.jarADD demo/data.csv data.csvENTRYPOINT [ "/opt/entrypoint.sh" ]
Docker Hub、 ACR(Azure Container Registry)、 Alibaba Cloud Container Registry などのコンテナイメージレジストリにログインします。ここではDocker Hubを例に説明します。
<spark root dir>
に移動し、関連するコマンドを実行してイメージをビルドし、プッシュします。必要に応じて、イメージのレジストリ名やタグを変更します。
$ docker build -t bwbw723/spark-k8s-hologres-demo:2.4.7 -f ./kubernetes/dockerfiles/spark/Dockerfile .$ docker push bwbw723/spark-k8s-hologres-demo:2.4.7
AKS(Azure Kubernetes Service) clusterの準備
AKS(Azure Kubernetes Service) クラスタの作成は、Azure official help documentの手順に従って、Azure CLIで行うことができます。
Azure CLIの準備ができていない場合は、Install Azure CLIとSign inを参照してください。
https://docs.microsoft.com/en-us/azure/aks/spark-job#create-an-aks-cluster
https://docs.microsoft.com/en-us/cli/azure/install-azure-cli
https://docs.microsoft.com/en-us/cli/azure/authenticate-azure-cli
コマンドは次の通りです。
# Install Azure CLI$ curl -L https://aka.ms/InstallAzureCli | bash# Sign in$ az login# Create a resource group for the cluster$ az group create --name <resource group name> --location <group location># Create a Service Principal for the cluster$ az ad sp create-for-rbac --name <service principal name># Create the AKS cluster with nodes that are of size Standard_DS2_v2$ az aks create --resource-group <above resource group name> --name <aks cluster name> --node-vm-size <node vm size such as Standard_DS2_v2> --generate-ssh-keys --service-principal <APPID generated above> --client-secret <PASSWORD generated above># Get credentials for cluster connection$ az aks get-credentials --resource-group <above resource group name> --name <above aks cluster name>
PySparkでスパークジョブの準備
Sparkは複数のプログラム言語をサポートしており、ここではpythonとPySparkを例にしています。タスクが完了すると、スパークイメージで用意したCSVファイルに含まれる全てのレコードが対象のHologresテーブルに保存されます。
DataFrameReaderのcsv("path")
またはformat("csv").load("path")
を使うと、CSVファイルをPySparkのDataFrameに読み込むことができます。また、JdbcRDDを使えば、特定のドライバを使って簡単にデータベースにDataFrameを書き込むことができます。
この状態ではHologresインスタンスはPostgreSQLとして接続されています。詳しい情報は、spark JDBC data sourceやspark generic file sourceを参照してください。
以下のスクリプトで hologres_spark.py
を作成します。
import findsparkfindspark.init()from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, IntegerType, StringTypeif __name__ == '__main__':spark = SparkSession.builder.appName('HologresSparkDemo').getOrCreate()schema = StructType() \.add("order_id", StringType(), True) \.add("user_id", IntegerType(), True) \.add("book_id", IntegerType(), True) \.add("book_name", StringType(), True) \.add("order_cnt", IntegerType(), True) \.add("order_amt", IntegerType(), True)df_with_schema = spark.read.format("csv") \.option("header", True) \.schema(schema) \.load("<your file path in spark image, such as /opt/spark/work-dir/data.csv>")df_with_schema.write.mode("append").format("jdbc") \.option("url", "<JDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \.option("dbtable", "<your target table name>") \.option("user", "<your accessKeyId>") \.option("password", "<your accessKeySecret>") \.option("driver", "org.postgresql.Driver") \.save()
Submit spark job
pythonスクリプトをAzure blob storageに置くには、この手順に従って配置する必要があります。
https://docs.microsoft.com/en-us/azure/aks/spark-job#copy-job-to-storage
コマンドは次の通りです。
# Create storage account$ az storage account create --resource-group <above resource group name> --name <storage account name> --sku Standard_LRS# Export Azure storage connection string$ export AZURE_STORAGE_CONNECTION_STRING=`az storage account show-connection-string --resource-group <above resource group name> --name <above storage account name> -o tsv`# Create storage container$ az storage container create --name <container name># Set container permission as public$ az storage container set-permission --name <above container name> --public-access blob# Upload python script into container$ az storage blob upload --container-name <above container name> --file <local file path> --name <file name in container># Get access url for uploaded python script$ scriptUrl=$(az storage blob url --container-name <above container name> --name <above file name in container> | tr -d '"')
ここではSpark Jobをsubmitする方法を説明します。
kubectl proxy
を実行して、kube-proxyを別のコマンドラインで起動します。
https://docs.microsoft.com/en-us/azure/aks/spark-job#submit-a-spark-job
先ほど前のコマンドラインに戻り、以下のようにサービスアカウントとクラスターを作成します。
$ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
PySparkタスクをAKS (Azure Kubernetes Service) クラスターにSubmit します。必要に応じて、image 情報とFile pathを更新します。
./bin/spark-submit \--master k8s://http://127.0.0.1:8001 \--deploy-mode cluster \--name hologres-spark \--conf spark.executor.instances=3 \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=bwbw723/spark-k8s-hologres-demo:2.4.7 \--driver-class-path local:///opt/spark/work-dir/postgresql-42.2.6.jar \--jars local:///opt/spark/work-dir/postgresql-42.2.6.jar \<above scriptUrl>
kubectl get pods
で実際のPod情報を確認します。
Hologresテーブルのレコードをチェックします。
##PySparkでspark streaming jobを準備する
Alibaba Cloudが提供するSpark SDK を使用すると、LogServiceでReceiver(受信モード)もしくはDirect(直接モード)でログデータを受け取りすることが出来ます。関連情報およびソースとして Git aliyun-emapreduce-datasources および、Git aliyun-emapreduce-demo にて詳しい説明があります。
Spark SDKを使って、DataStreamReaderはログサービスを loghub
フォーマットとして認識することができました。
Pythonの使い方については、こちらを参照してください。
以下のスクリプトでholres_spark_streaming.py
を作成します。
import findsparkfindspark.init()from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, IntegerType, StringTypedef foreach_batch_function(df, epoch_id):df.write.mode("append").format("jdbc") \.option("url", "<public JDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \.option("dbtable", "<your target table name>") \.option("user", "<your accessKeyId>") \.option("password", "<your accessKeySecret>") \.option("driver", "org.postgresql.Driver") \.save()passif __name__ == '__main__':spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()schema = StructType() \.add("order_id", StringType(), True) \.add("user_id", IntegerType(), True) \.add("book_id", IntegerType(), True) \.add("book_name", StringType(), True) \.add("order_cnt", IntegerType(), True) \.add("order_amt", IntegerType(), True)line_with_schema = spark.readStream.format("loghub") \.schema(schema) \.option("sls.project", "<your log service project name>") \.option("sls.store", "<your log service logstore name>") \.option("access.key.id", "<your accessKeyId>") \.option("access.key.secret", "<your accessKeySecret>") \.option("endpoint", "<your endpoint such as ap-northeast-1.log.aliyuncs.com>") \.option("startingoffsets", "latest") \.load()stream = line_with_schema.writeStream.foreachBatch(foreach_batch_function).start()stream.awaitTermination()
データ転送のためにAlibaba Cloud Log Serviceのリソース準備
Alibaba Cloudコンソールに入り、Log ServiceプロジェクトとLogStoreを準備します。LogStoreを作成するときは、WebTrackingが有効になっていることを確認します。
logstore dataを入力し、インデックス属性を有効にします。
Spark streaming jobの実行
まずはHologresのテーブルデータをDELETEらSQLクエリでクリアします。
Azure CLI (az storage blob upload ...
) またはAzure Portalで、事前に作成したSpark streamingスクリプトをAzure blobストレージにアップロードします。
アップロードされたスクリプトのアクセスURLを取得します。
作業環境のコマンドラインに戻り、AKS(Azure Kubernetes Service)クラスタにSpark streaming Taskを投入します。
./bin/spark-submit \--master k8s://http://127.0.0.1:8001 \--deploy-mode cluster \--name hologres-spark-streaming \--conf spark.executor.instances=3 \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.kubernetes.container.image.pullPolicy=Always \--conf spark.kubernetes.container.image=bwbw723/spark-k8s-hologres-demo:2.4.7 \--driver-class-path local:///opt/spark/work-dir/postgresql-42.2.6.jar \--jars local:///opt/spark/work-dir/postgresql-42.2.6.jar,local:///opt/spark/work-dir/fastjson-1.2.45.jar,local:///opt/spark/work-dir/commons-validator-1.4.0.jar,local:///opt/spark/work-dir/ezmorph-1.0.6.jar,local:///opt/spark/work-dir/emr-logservice_2.11-2.2.0.jar,local:///opt/spark/work-dir/loghub-client-lib-0.6.13.jar,local:///opt/spark/work-dir/aliyun-log-0.6.10.jar,local:///opt/spark/work-dir/json-lib-2.4-jdk15.jar,local:///opt/spark/work-dir/zkclient-0.10.jar,local:///opt/spark/work-dir/emr-common_2.11-2.2.0.jar \<access url of your spark streaming script>
kubectl get pods
で realted pod 情報を確認します。
Spark streaming job の実行中に、以下のスクリプトで新しいLog Serviceのレコードを送信し、関連するログを確認します。
import randomimport requestsimport uuidproject = "<your log service project name>"endpoint = "<your endpoint such as ap-northeast-1.log.aliyuncs.com>"logstore = "<your log service logstore name>"topic = "<your logs topic>"# urlencoded book name in Japanesebook_info = ["%E5%AB%8C%E3%82%8F%E3%82%8C%E3%82%8B%E5%8B%87%E6%B0%97","%E3%83%8E%E3%83%AB%E3%82%A6%E3%82%A7%E3%82%A4%E3%81%AE%E6%A3%AE","%E6%B5%B7%E8%BE%BA%E3%81%AE%E3%82%AB%E3%83%95%E3%82%AB","%E8%89%B2%E5%BD%A9%E3%82%92%E6%8C%81%E3%81%9F%E3%81%AA%E3%81%84%E5%A4%9A%E5%B4%8E%E3%81%A4%E3%81%8F%E3%82%8B%E3%81%A8%E5%BD%BC%E3%81%AE%E5%B7%A1%E7%A4%BC%E3%81%AE%E5%B9%B4","%E5%AE%B9%E7%96%91%E8%80%85%E2%85%A9%E3%81%AE%E7%8C%AE%E8%BA%AB", "%E4%BA%BA%E9%96%93%E5%A4%B1%E6%A0%BC","%E3%81%93%E3%81%93%E3%82%8D", "%E5%A4%A9%E5%A3%B0%E4%BA%BA%E8%AF%AD","%E5%B9%B8%E3%81%9B%E3%81%AB%E3%81%AA%E3%82%8B%E5%8B%87%E6%B0%97"]book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&order_id={4}&user_id={5}&book_id={6}&book_name={7}&order_cnt={8}&order_amt={9}'for i in range(5):book_id = random.randint(1, 9)order_count = random.randint(1, 5)res = requests.get(url.format(project, endpoint, logstore, topic, uuid.uuid1(), random.randint(1, 50), book_id,book_info[book_id - 1], order_count,book_price[book_id - 1] * order_count))print(res)
今度はHologresでレコードを確認します。
最後に
ここまで、Azureのk8s(AKS)にApache Sparkを配置し、Hologresへデータ連携する方法を紹介しました。
この方法を生かすことで、Alibaba Cloudでない、他社クラウドサービスからApache Spark on k8sを使ってk8sのサービス基盤からリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。