Apache Flink on k8sからHologresへデータ連携方法
本記事では、Hologres を使って、AWSのKubernetesサービスであるECSからApache Flinkによるマルチクラウドーリアルタイムデータ連携する方法をご紹介します。
Hologresとは
Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。
少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
Apache Flinkとは
Apache Flink は、大規模データを分散ストリームおよびバッチデータ処理のためのオープンソースプラットフォームです。Apache Sparkと同様、複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションのストリーミング分野で人気があります。
Kubernetes(k8s)とは
Kubernetes は、K8sとも呼ばれ、コンテナ化されたアプリケーションのデプロイ、スケーリング、管理を自動化するためのオープンソースのシステムで、アプリケーションを構成するコンテナを論理的な単位にまとめ、管理と発見を容易にします。
Kubernetes上でApache Flinkを使ってHologresでデータ処理をする
このガイドラインでは、Kubernetes上のApache Flinkを使って、Hologresによるデータ処理を段階的に作成します。Flinkの公式イメージはACK(Alibaba Cloud Container Service for Kubernetes)にデプロイされます。
このチュートリアルについて
対象者:
本ガイドラインは、以下のような方を対象としています
- git、docker、Kubernetes,、Alibaba Cloud、Hologres,、ACK(Alibaba Cloud Container Service for Kubernetes)に関する基本的な知識を持っている
- 基本的なデプロイメントの知識があり、Java実行ができる
前提条件:
- Alibaba Cloudのアカウントを所持している
- Alibaba Cloud HologresとACK(Alibaba Cloud Container Service for Kubernetes)が使用可能な状態になっている
- 少なくとも1つのHologresインスタンスを持っている
- 作業環境にmaven、Java、gitが用意されている
Kubernetesクラスタの準備
ACK(Alibaba Cloud Container Service for Kubernetes)にアクセスし、Flinkデプロイ用のサーバーレスKubernetesクラスタを作成します。
新しいサーバーレスKubernetesクラスターの作成には、約3分ほどかかります。作成ログを確認しながら、クラスターの準備が整うまで待機します。
スタンドアロンのFlinkクラスターをKubernetes上にデプロイ
Flink Session clusterは、長時間実行されるKubernetes Deploymentとして実行されます。1つのSessionクラスター上で複数のFlinkジョブを実行することができます。各ジョブはクラスターがデプロイされた後にクラスターにサブミットする必要があります。
KubernetesにおけるFlink Sessionクラスターのデプロイには、少なくとも3つのコンポーネントがあります。
- JobManagerを実行するデプロイメント
- TaskManagersのプールのためのデプロイメント
- JobManagerのRESTおよびUIポートを公開するサービス
以下のステップでは、これらを一つずつ準備していきます。より詳細な情報については、Flink official document を参照してください。yaml設定ファイルの記載方法は、Flink official example configuration yamlにあります。
ConfigMap の新規作成
Kubernetes クラスタ管理画面に移動し、ConfigMap メニューから以下の yaml をベースにした ConfigMap を新規作成します。
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
job managerのデプロイ
Kubernetesのクラスタ管理画面に移動し、Deploymentメニューから以下のyamlに基づいてjob managerとして新しいDeploymentを作成します。
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagerspec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.13.0-scala_2.11args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
task managerのデプロイ
同じプロセスの下で、以下の yaml に基づいてTask managerとして新しいDeploymentを作成します。
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagerspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:1.13.0-scala_2.11args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
これにより、Kubernetesクラスタ上にFlink job managerとtask managerがデプロイされました。これらを元に、関連するサービスを作成する必要があります。
job manager UI サービスの作成
Kubernetes クラスタ管理画面の「サービス」メニューから、以下の yaml に基づいて、job manager UI サービスを作成します。これは非HAモードの場合のみ必要で、job manager のWeb UIポートを公開します。
apiVersion: v1kind: Servicemetadata:name: flink-jobmanagerspec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
job manager restサービスの作成
同じプロセスの下で、以下の yaml に基づいて job manager rest サービスとして新しいサービスを作成します。 job manager用の rest ポートを公開します。
apiVersion: v1kind: Servicemetadata:name: flink-jobmanager-restspec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
task manager queryステートサービスの作成
同じプロセスの下で、以下のyamlに基づいて、task manager queryステートサービスとして新しいサービスを作成します。照会可能な状態にアクセスするためのtask managerのポートを公開します。
apiVersion: v1kind: Servicemetadata:name: flink-taskmanager-query-statespec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
オプション:job managerの公開用アクセスUIポートの設定
インターネット経由でサービスにアクセスする場合は、 job managerのUIポートで公開用の別のサービスが必要です。イントラネット経由でジョブマネージャUIサービスにアクセスする場合、このステップは無視してください。
サーバー管理ページの作成ボタンをクリックし、SLB (Server Load Balancer) を利用した公開用サービスを構築します。
データを生成してHologresに送信するタスクを準備
Alibaba Cloudでは、Flink用のHologresコネクタが提供されており、Github hologres-flink-examplesでサンプルコードも提供されています。このサンプルコードを使って、データを生成し、Hologresに送信するタスクを作成します。
まず、コマンドラインで git clone https://github.com/hologres/hologres-flink-examples.git
でプロジェクトをクローンします。
サンプルコードではユーザの入力からHologresの接続情報を取得していますが、Kubernetesクラスタでは環境変数を使用する必要があります。
<project root>/src/main/java/io/hologres/flink/example/HologresSinkExample.java
を以下のように更新します。
......public static void main(String[] args) throws Exception {/*Options options = new Options();options.addOption("e", "endpoint", true, "Hologres endpoint");options.addOption("u", "username", true, "Username");options.addOption("p", "password", true, "Password");options.addOption("d", "database", true, "Database");options.addOption("t", "tablename", true, "Table name");CommandLineParser parser = new DefaultParser();CommandLine commandLine = parser.parse(options, args);String endPoint = commandLine.getOptionValue("endpoint");String userName = commandLine.getOptionValue("username");String password = commandLine.getOptionValue("password");String database = commandLine.getOptionValue("database");String tableName = commandLine.getOptionValue("tablename");*/String database = System.getenv("HOLO_TEST_DB");String userName = System.getenv("HOLO_ACCESS_ID");String password = System.getenv("HOLO_ACCESS_KEY");String endPoint = System.getenv("HOLO_ENDPOINT");String tableName = System.getenv("HOLO_TABLE_NAME");......
ビルドプロセス中にエラーメッセージが表示された場合は、pom.xml内のholo-client
のバージョンを更新します。正しいバージョンは、maven repository から入手できます。
https://mvnrepository.com/artifact/com.alibaba.hologres/holo-client
ビルドが完了すると、
<project root>/target/hologress-flink-examples-1.0.0-jar-with-dependencies
が入手出来ますので、これを利用します。
Kubernetes 上の Flink クラスタにタスクを投入しテスト
Hologresテーブルの作成
ソースコードによれば、生成されるテストデータは以下のスキーマ構成になっています。そのため、送信データを保存するには、同じスキーマ配下に新しいHologresテーブルを作成する必要があります。
......TableSchema schema = TableSchema.builder().field("user_id", DataTypes.BIGINT()).field("user_name", DataTypes.STRING()).field("item_id", DataTypes.BIGINT()).field("item_name", DataTypes.STRING()).field("price", DataTypes.DECIMAL(38, 2)).field("province", DataTypes.STRING()).field("city", DataTypes.STRING()).field("longitude", DataTypes.STRING()).field("latitude", DataTypes.STRING()).field("ip", DataTypes.STRING()).field("sale_timestamp", Types.SQL_TIMESTAMP).build();......
Hologres接続情報のための新しいSecretsの作成
更新されたソースコードによると、スクリプトは環境変数からHologres接続情報を取得します。そのため、新しいSecretsを作成し、Flinkクラスタに設定する必要があります。
Hologres 接続情報のための有効な環境変数
job manager、task managerともに編集ボタンをクリックし、作成したSecretsに基づいて環境変数を追加します。Variable Key
が更新されたソースコードと一致することを確認します。
job manager、task managerの両方を再デプロイして、更新された環境変数をアクティベーションします。
UIサービスからのタスク投入
外部のエンドポイントからjob managerのUIサービスにアクセスし、ビルドしたJARを使ってタスクを投入します。
新しいタスクが作成され、生成されたデータが対象のHologresテーブルに送信されます(running
ステータスになります)
実行中のタスクを停止したい場合は、「cancel job」リンクをクリックします。
最後に
ここまで、Apache FlinkからHologresへリアルタイムデータ連携する方法を紹介しました。
この方法を生かすことで、例えばAWSやGCPなどのKubernetes(k8s)を使ったサービス基盤からApache FlinkでリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。