编译并运行Flink应用
在程序代码完成开发后,建议您上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Flink客户端的运行步骤是一样的。
基于YARN集群的Flink应用程序不支持在Windows环境下运行,只支持在Linux环境下运行。
操作步骤
- 在IntelliJ IDEA中,在生成Jar包之前配置工程的Artifacts信息。
- 在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。
- 在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。
图1 添加Artifacts
- 您可以根据实际情况设置Jar包的名称、类型以及输出路径。
图2 设置基本信息
- 选中“'FlinkStreamJavaExample' compile output”,右键选择“Put into Output Root”。然后单击“Apply”。
图3 Put into Output Root
- 最后单击“OK”完成配置。
- 生成Jar包。
- 在IDEA主页面,选择“Build > Build Artifacts...”。
图4 Build Artifacts
- 在弹出的菜单中,选择“FlinkStreamJavaExample > Build”开始生成Jar包。
图5 Build
- 当Event log中出现如下类似日志时,表示Jar包生成成功。您可以从1.c中配置的路径下获取到Jar包。
21:25:43 Compilation completed successfully in 36 sec
- 在IDEA主页面,选择“Build > Build Artifacts...”。
- 将2中生成的Jar包(如FlinkStreamJavaExample.jar)拷贝到Linux环境的Flink运行环境下(即Flink客户端),如“/opt/Flink_test”。运行Flink应用程序。
在Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn session命令,启动Flink集群。执行命令例如:
bin/yarn-session.sh -n 3 -jm 1024 -tm 1024
在Flink任务运行过程中禁止重启HDFS服务或者重启所有DataNode实例,否则可能会导致任务失败,并可能导致应用部分临时数据无法清空。
- 运行DataStream样例程序(Scala和Java语言)。
在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码,例如:
bin/flink run --class com.huawei.flink.example.stream.FlinkStreamJavaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2 bin/flink run --class com.huawei.flink.example.stream.FlinkStreamScalaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
表1 参数说明 参数名称
说明
<filePath>
指本地文件系统中文件路径,每个节点都需要放一份/opt/log1.txt和/opt/log2.txt并使用chmod 755 文件名命令为用户赋予读、写、执行权限,而属组用户和其他用户只有读、执行权限。可以默认,也可以自行设置。
<windowTime>
指窗口时间大小,以分钟为单位。可以默认,也可以自行设置。
- 运行向Kafka生产并消费数据样例程序(Scala和Java语言)。
bin/flink run --class com.huawei.flink.example.kafka.WriteIntoKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]
执行命令启动程序消费数据。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。
bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]
表2 参数说明 参数名称
说明
是否必须配置
topic
表示kafka主题名。
是
bootstrap.server
表示broker集群ip/port列表。
是
security.protocol
运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应MRS Kafka集群的21005/21007/21008/21009端口。
- 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。
- 如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。
否
说明:该参数未配置时为非安全Kafka。
kerberos.domain.name
Kafka Domain名称。
否
说明:security.protocol配置了SASL时必须配置。
四种类型实际命令示例,以ReadFromKafka为例如下:
bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005
bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
- 运行异步Checkpoint机制样例程序(Scala和Java语言)。
为了丰富样例代码,Java版本使用了Processing Time作为数据流的时间戳,而Scala版本使用Event Time作为数据流的时间戳。具体执行命令参考如下:
- 将Checkpoint的快照信息保存到HDFS。
- Java
1
bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
- Scala
1
bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
- Java
- 将Checkpoint的快照信息保存到本地文件。
- Java
1
bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
- Scala
1 2
bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
- Checkpoint源文件路径:flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe
其中,flink/checkpoint/checkpoint表示指定的根目录。
fd5f5b3d08628d83038a30302b611表示以jobID命名的第二次目录。
chk-X中"X"为checkpoint编号,第三层目录。
4f854bf4-ea54-4595-a9d9-9b9080779ffe表示checkpoint源文件。
- Flink在集群模式下checkpoint将文件放到HDFS。
- Java
- 将Checkpoint的快照信息保存到HDFS。
- 运行Stream SQL Join样例程序
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Java...
1
bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005
- 在集群内任一节点启动netcat命令,等待应用程序连接。
1
netcat -l -p 9000
- 启动程序接受Socket数据,并执行联合查询。
1
bin/flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port 9000
- 启动程序向Kafka生产。Kafka配置可参考运行向Kafka生产并消费数据样例程序(Java...
- 运行DataStream样例程序(Scala和Java语言)。