快速开发Flink应用
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。
Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
Flink整个系统包含三个部分:
- Client
- TaskManager
- JobManager
Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。
MRS对外提供了多个Flink组件的应用开发样例工程,本实践用于指导您创建MRS集群后,获取并导入样例工程并在本地进行编译调测,用于实现Flink DataStream程序处理数据。
创建MRS Flink集群
- 购买一个包含有Hive组件的MRS集群,详情请参见购买自定义集群。
本文以购买的MRS 3.2.0-LTS.1版本的集群为例,组件包含Hadoop、Flink组件,集群开启Kerberos认证。
- 单击“立即购买”,等待MRS集群创建成功。
准备集群配置文件
- 集群创建成功后,登录FusionInsight Manager创建用于提交Flink作业的集群用户。
选择“系统 > 权限 > 用户 > 添加用户”,在新增用户界面创建一个机机用户,例如flinkuser。
“用户组”需加入“supergroup”用户组,并关联“System_administrator”角色。
- 选择“user.keytab”文件与“krb5.conf”文件。 ,在用户名为“flinkuser”的操作列选择“更多 > 下载认证凭据”下载认证凭据文件,保存后解压得到该用户的
- 选择“集群 > 概览 > 更多 > 下载客户端”,“选择客户端类型”设置为“仅配置文件”,单击“确定”,等待客户端文件包生成后根据浏览器提示下载客户端到本地并解压。
例如,客户端配置文件压缩包为“FusionInsight_Cluster_1_Services_Client.tar”,解压后得到“FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles.tar”,继续解压该文件。
进入客户端配置文件解压路径“FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles\Flink\config”,获取相关配置文件。
获取样例工程
- 通过开源镜像站获取样例工程。
下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程。
根据集群版本选择对应的分支,下载并获取MRS相关样例工程。
例如本章节场景对应示例为“FlinkStreamJavaExample”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1/src/flink-examples/flink-examples-security/FlinkStreamJavaExample。
- 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包,具体操作可参考配置并导入样例工程。
图1 Flink样例工程示例
本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包。
- 在本示例中,将开发的DataStream程序通过Flink客户端提交运行,因此在代码中不需单独进行安全认证。
假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现实时统计总计网购时间超过2个小时的女性网民信息。
源数据内容如下,日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。- log1.txt:周六网民停留日志。
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
log2.txt:周日网民停留日志。
LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
开发思路为:
- 读取文本数据,生成相应DataStream,解析数据生成UserRecord信息。
- 筛选女性网民上网时间数据信息。
- 按照姓名、性别进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
- 筛选连续上网时间超过阈值的用户,并获取结果。
public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // 读取文本路径信息,并使用逗号分隔,如果源文件在HDFS中,可配置为详细HDFS路径,例如“hdfs://hacluster/tmp/log1.txt,hdfs://hacluster/tmp/log2.txt” final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // 构造执行环境,使用eventTime处理窗口数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 读取文本数据流 DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint java"); } // 构造keyBy的关键字作为分组依据 private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // 解析文本行数据,构造UserRecord数据结构 private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // UserRecord数据结构的定义,并重写了toString打印方法 public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }
- log1.txt:周六网民停留日志。
编译并运行程序
- 在IntelliJ IDEA中,配置工程的Artifacts信息。
- 生成Jar包。
- 在IDEA主页面,选择 。
- 在弹出的菜单中,选择图5 Build
开始生成Jar包。
- Jar包编译成功后,可以从1.c中配置的路径下获取到“flink-demo.jar”文件。
- 安装并配置Flink客户端。
- 安装MRS集群客户端,例如客户端安装目录为“/opt/hadoopclient”。
- 将准备集群配置文件中下载的认证凭据压缩包解压缩,并将得到的文件复制到客户端节点中,例如客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
- 执行以下命令编辑Flink客户端配置参数并保存。
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“jobmanager.web.allow-access-address”配置项中,同时在对应配置添加keytab路径以及用户名。
... jobmanager.web.allow-access-address: 192.168.64.122,192.168.64.216,192.168.64.234 ... security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab security.kerberos.login.principal: flinkuser ...
- 配置安全认证。
- 执行以下命令,生成Flink客户端安全认证文件。
cd /opt/hadoopclient/Flink/flink/bin
sh generate_keystore.sh
执行脚本后,需输入一个用于认证的自定义密码。
- 配置客户端访问“flink.keystore”和“flink.truststore”文件的路径配置。
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
mv flink.keystore ssl/
mv flink.truststore ssl/
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
修改如下两个参数的路径详细为相对地址路径。
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- 执行以下命令,生成Flink客户端安全认证文件。
- 将2中生成的Jar包上传到Flink客户端节点相关目录下,例如上传至“/opt/hadoopclient”。
在jar包所在目录下创建“conf”目录,将准备集群配置文件中获取的集群客户端配置文件软件包内“Flink/config”内的配置文件上传至“conf”目录。
- 将应用程序待处理的源数据文件上传至NodeManager实例所在节点。
在本示例中,源数据文件“log1.txt”、“log2.txt”放置在本地,因此需提前上传至所有Yarn NodeManager实例的节点上的“/opt”目录下,且文件权限配置为755。
- 在Flink客户端下通过yarn session命令启动Flink集群。
cd /opt/hadoopclient/Flink/flink
bin/yarn-session.sh -jm 1024 -tm 1024 -t conf/ssl/
... Cluster started: Yarn cluster with application id application_1683438782910_0009 JobManager Web Interface: http://192.168.64.10:32261
- Flink集群启动成功后,重新打开一个客户端连接窗口,进入Flink客户端目录运行程序。
source /opt/hadoopclient/bigdata_env
cd /opt/hadoopclient/Flink/flink
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/hadoopclient/flink-demo.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
... 2023-05-26 19:56:52,068 | INFO | [main] | Found Web Interface host-192-168-64-10:32261 of application 'application_1683438782910_0009'. | org.apache.flink.yarn.YarnClusterDescriptor.setClusterEntrypointInfoToConfig(YarnClusterDescriptor.java:1854) Job has been submitted with JobID 7647255752b09456d5a580e33a8529f5 Program execution finished Job with JobID 7647255752b09456d5a580e33a8529f5 has finished. Job Runtime: 36652 ms
- 查看运行结果。
使用flinkuser用户登录FusionInsight Manager,选择“集群 > 服务 > Yarn”,进入Yarn ResourceManager WebUI页面,在“Applications”页面单击作业名称,进入到作业详情页面。
图6 查看Yarn作业详情
对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
图7 查看Flink作业详情
在本示例程序中,单击“Task Managers”,在作业的“Stdout”页签中可查看程序运行结果。
图8 查看程序运行结果