编译并调测Flink应用
操作场景
在程序代码完成开发后,编译jar包并上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Flink客户端的运行步骤是相同的。
基于YARN集群的Flink应用程序不支持在Windows环境下运行,只支持在Linux环境下运行。
操作步骤
- 在IntelliJ IDEA中,单击IDEA右边Maven窗口的“Reload All Maven Projects”,进行Maven项目依赖import。
图1 Reload projects
- 编译运行程序。
编译方式有以下两种:
- 方法一:
- 选择“Maven > 样例工程名称 > Lifecycle > clean”,双击“clean”运行maven的clean命令。
- 选择“Maven > 样例工程名称 > Lifecycle > install”,双击“install”运行maven的install命令。
图2 maven工具clean和install
- 方法二:在IDEA的下方Terminal窗口进入“pom.xml”所在目录,手动输入mvn clean install命令进行编译。
图3 idea terminal输入"mvn clean install"
- 方法一:
- 编译完成,打印“BUILD SUCCESS”,生成target目录,获取target目录下的jar包。
图4 编译完成
- 将3中生成的Jar包(如FlinkStreamJavaExample.jar复制到Flink客户端节点相关目录下,例如“/opt/hadoopclient”。然后在该目录下创建“conf”目录,将需要的配置文件复制至“conf”目录,具体操作请参考准备运行环境,运行Flink应用程序。
在Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下通过yarn session命令启动flink集群。
- 执行yarn-session.sh之前,应预先将Flink应用程序的运行依赖包复制到客户端目录“#{客户端安装目录}/Flink/flink/lib”下,应用程序运行依赖包请参考样例工程运行依赖包参考信息。
- 在Flink任务运行过程中禁止重启HDFS服务或者重启所有DataNode实例,否则可能会导致任务失败,并可能导致应用部分临时数据无法清空。
- 示例中的“ssl/”是Flink客户端目录下自定义的子目录,用来存放SSL keystore、truststore相关配置文件。
- MRS 3.2.1及以后版本使用-tm指定taskmanager内存不能小于4096MB。
例如:
cd /opt/hadoopclient/Flink/flink
bin/yarn-session.sh -jm 1024 -tm 4096 -t conf/ssl/
- 运行DataStream(Scala和Java)样例程序。
在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码。
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/hadoopclient/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/hadoopclient/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
“log1.txt”、“log2.txt”放置在本地时,需放在每个部署了Yarn NodeManager实例的节点上,权限为755。
表1 参数说明 参数名称
说明
<filePath>
指本地文件系统中文件路径,每个节点都需要放一份/opt/log1.txt和/opt/log2.txt。可以默认,也可以设置。
<windowTime>
指窗口时间大小,以分钟为单位。可以默认,也可以设置。
- Java
- 运行向Kafka生产并消费数据样例程序(Scala和Java语言)。
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password] [kerberos.domain.name]
消费数据的执行命令启动程序。bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
表2 参数说明 参数名称
说明
是否必须配置
topic
表示Kafka主题名。
是
bootstrap.server
表示broker集群ip/port列表。
是
security.protocol
运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应FusionInsight Kafka集群的21005/21007/21008/21009端口。
- 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,配置kerberos.domain.name为hadoop.系统域名,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。
说明:
登录FusionInsight Manager页面,选择“系统 > 权限 > 域和互信 > 本端域”,即可查看系统域名,系统域名所有字母需转换为小写。
- 如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。
否
说明:- 该参数未配置时为非安全Kafka。
- 如果需要配置SSL,truststore.jks文件生成方式可参考“Kafka开发指南 > 客户端SSL加密功能使用说明”章节。
- 执行本样例工程,需配置“allow.everyone.if.no.acl.found”为“true”,详情请参考配置对接Kafka。
- Kafka应用需要添加如下所示的jar文件:
- Flink服务端安装路径的lib目录下“flink-dist_*.jar”。
- Flink服务端安装路径的opt目录下的“flink-connector-kafka_*.jar”。
- Kafka客户端或Kafka服务端安装路径中的lib目录下“kafka-clients-*.jar”。
- truststore.jks配置若配置为绝对路径,应将该truststore.jks文件放置于每一个Yarn nodemanager指定目录;若配置为相对路径,应在yarn-session.sh脚本执行时添加上传目录,例如在执行▪命令4:之前,应首先执行yarn-session.sh -t config/ ***。
四种类型实际命令示例,以ReadFromKafka为例,集群域名为“HADOOP.COM”:
- 命令1:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
- 命令2:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
- 命令3:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/conf/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
- 命令4:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location config/truststore.jks --ssl.truststore.password xxx --kerberos.domain.name hadoop.hadoop.com
- 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,配置kerberos.domain.name为hadoop.系统域名,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。
- 运行异步Checkpoint机制样例程序(Scala和Java语言)。
为了丰富样例代码,Java版本使用了Processing Time作为数据流的时间戳,而Scala版本使用Event Time作为数据流的时间戳。具体执行命令参考如下:
- 将Checkpoint的快照信息保存到HDFS。
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/hadoopclient/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/hadoopclient/conf/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Java
- 将Checkpoint的快照信息保存到本地文件。
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/hadoopclient/FlinkCheckpointJavaExample.jar --chkPath file:///home/zzz/flink-checkpoint/
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/hadoopclient/FlinkCheckpointScalaExample.jar --ckkPath file:///home/zzz/flink-checkpoint/
- Java
- Checkpoint源文件路径:flink/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe
fd5f5b3d08628d83038a30302b611 //以jobID命名的第二层目录。
chk-X // "X"为checkpoint编号,第三层目录。
4f854bf4-ea54-4595-a9d9-9b9080779ffe //checkpoint源文件。
- Flink在集群模式下checkpoint将文件放到HDFS,本地路径只支持Flink的local模式,便于调测。
- 将Checkpoint的快照信息保存到HDFS。
- 运行Pipeline样例程序。
- Java
- 启动发布者Job
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/hadoopclient/FlinkPipelineJavaExample.jar
- 启动订阅者Job1
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/hadoopclient/FlinkPipelineJavaExample.jar
- 启动订阅者Job2
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/hadoopclient/FlinkPipelineJavaExample.jar
- 启动发布者Job
- Scala
- 启动发布者Job
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/hadoopclient/FlinkPipelineScalaExample.jar
- 启动订阅者Job1
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/hadoopclient/FlinkPipelineScalaExample.jar
- 启动订阅者Job2
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/hadoopclient/FlinkPipelineScalaExample.jar
- 启动发布者Job
- Java
- 运行Stream SQL Join样例程序
- Java
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Scal...
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/hadoopclient/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
- 在集群内任一节点启动netcat命令,等待应用程序连接。
netcat -l -p 9000
- 启动程序接受Socket数据,并执行联合查询。
bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/hadoopclient/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Scal...
- Scala(适用于MRS 3.3.0及以后版本)
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Scal...。
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
- 在集群内任一节点启动netcat命令,等待应用程序连接。
netcat -l -p 9000
- 启动程序接受Socket数据,并执行联合查询。
bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Scal...。
- Java
- 运行Flink HBase样例程序(MRS 3.2.0及以后版本)
- yarn-session方式
- 启动Flink集群。
./bin/yarn-session.sh -t config -jm 1024 -tm 1024
- 运行Flink程序,并输入参数。
bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx
- 启动Flink集群。
- yarn-cluster方式
bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx
- yarn-session方式
- 运行Flink Hudi样例程序(MRS 3.2.1及以后版本)
- yarn-session方式
- 启动Flink集群。
./bin/yarn-session.sh -t config -jm 1024 -tm 4096
- 运行Flink程序,并输入参数。
./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable ./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
- 启动Flink集群。
- yarn-cluster方式
./bin/flink run -m yarn-cluster -yt config --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable ./bin/flink run -m yarn-cluster -yt config --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
- yarn-session方式
- 运行RestAPI创建租户样例程序,以TestCreateTenants程序为例。
- yarn-session方式
- 启动Flink集群。
./bin/yarn-session.sh -t config -jm 1024 -tm 4096
- 运行Flink程序,并输入参数。
./bin/flink run --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx --keytab xx/xx/user.keytab --krb5 xx/xx/krb5.conf --principal username
- 启动Flink集群。
- yarn-cluster方式
./bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt//opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx --keytab xx/xx/user.keytab --krb5 xx/xx/krb5.conf --principal username
- yarn-session方式
- 运行Flink Jar作业提交SQL任务(适用于MRS 3.2.1及以后版本)
- yarn-session方式
- 启动Flink集群。
./bin/yarn-session.sh -t config -jm 1024 -tm 4096
- 运行Flink程序,并输入参数。
bin/flink run -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql
- 启动Flink集群。
- yarn-cluster方式
bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql
- yarn-session方式
针对Flink提供的几个样例工程,其对应的运行依赖包请参考样例工程运行依赖包参考信息。
如果Flink开启了HA(MRS 3.2.0及以后版本),则RestAPI创建租户样例的“--hostName”参数值为FlinkServer的浮动IP。