更新时间:2024-08-03 GMT+08:00

快速开发Flink应用

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。

Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

Flink整个系统包含三个部分:

  • Client

    Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。

  • TaskManager

    Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。

  • JobManager

    Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。

MRS对外提供了多个Flink组件的应用开发样例工程,本实践用于指导您创建MRS集群后,获取并导入样例工程并在本地进行编译调测,用于实现Flink DataStream程序处理数据。

创建MRS Flink集群

  1. 购买一个包含有Hive组件的MRS集群,详情请参见购买自定义集群

    本文以购买的MRS 3.2.0-LTS.1版本的集群为例,组件包含Hadoop、Flink组件,集群开启Kerberos认证。

  2. 单击“立即购买”,等待MRS集群创建成功。

准备集群配置文件

  1. 集群创建成功后,登录FusionInsight Manager创建用于提交Flink作业的集群用户。

    选择“系统 > 权限 > 用户 > 添加用户”,在新增用户界面创建一个机机用户,例如flinkuser

    “用户组”需加入“supergroup”用户组,并关联“System_administrator”角色。

  2. 选择系统 > 权限 > 用户,在用户名为“flinkuser”的操作列选择“更多 > 下载认证凭据”下载认证凭据文件,保存后解压得到该用户的“user.keytab”文件与“krb5.conf”文件。
  3. 选择“集群 > 概览 > 更多 > 下载客户端”,“选择客户端类型”设置为“仅配置文件”,单击“确定”,等待客户端文件包生成后根据浏览器提示下载客户端到本地并解压。

    例如,客户端配置文件压缩包为“FusionInsight_Cluster_1_Services_Client.tar”,解压后得到“FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles.tar”,继续解压该文件。

    进入客户端配置文件解压路径“FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles\Flink\config”,获取相关配置文件。

获取样例工程

  1. 通过开源镜像站获取样例工程。

    下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程

    根据集群版本选择对应的分支,下载并获取MRS相关样例工程。

    例如本章节场景对应示例为“FlinkStreamJavaExample”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1/src/flink-examples/flink-examples-security/FlinkStreamJavaExample

  2. 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包,具体操作可参考配置并导入样例工程

    图1 Flink样例工程示例

    本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包。

  3. 在本示例中,将开发的DataStream程序通过Flink客户端提交运行,因此在代码中不需单独进行安全认证。

    假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现实时统计总计网购时间超过2个小时的女性网民信息。

    源数据内容如下,日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”
    • log1.txt:周六网民停留日志。
      LiuYang,female,20 
      YuanJing,male,10 
      GuoYijun,male,5 
      CaiXuyu,female,50 
      Liyuan,male,20 
      FangBo,female,50 
      LiuYang,female,20 
      YuanJing,male,10 
      GuoYijun,male,50 
      CaiXuyu,female,50 
      FangBo,female,60

      log2.txt:周日网民停留日志。

      LiuYang,female,20
      YuanJing,male,10
      CaiXuyu,female,50
      FangBo,female,50
      GuoYijun,male,5
      CaiXuyu,female,50
      Liyuan,male,20
      CaiXuyu,female,50
      FangBo,female,50
      LiuYang,female,20
      YuanJing,male,10
      FangBo,female,50
      GuoYijun,male,50
      CaiXuyu,female,50
      FangBo,female,60

    开发思路为:

    1. 读取文本数据,生成相应DataStream,解析数据生成UserRecord信息。
    2. 筛选女性网民上网时间数据信息。
    3. 按照姓名、性别进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
    4. 筛选连续上网时间超过阈值的用户,并获取结果。
    public class FlinkStreamJavaExample {
        public static void main(String[] args) throws Exception {
            // 打印出执行flink run的参考命令
            System.out.println("use command as: ");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2");
            System.out.println("******************************************************************************************");
            System.out.println("<filePath> is for text file to read data, use comma to separate");
            System.out.println("<windowTime> is the width of the window, time as minutes");
            System.out.println("******************************************************************************************");
    
            // 读取文本路径信息,并使用逗号分隔,如果源文件在HDFS中,可配置为详细HDFS路径,例如“hdfs://hacluster/tmp/log1.txt,hdfs://hacluster/tmp/log2.txt”
            final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",");
            assert filePaths.length > 0;
    
            // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了
            final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2);
    
            // 构造执行环境,使用eventTime处理窗口数据
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            // 读取文本数据流
            DataStream<String> unionStream = env.readTextFile(filePaths[0]);
            if (filePaths.length > 1) {
                for (int i = 1; i < filePaths.length; i++) {
                    unionStream = unionStream.union(env.readTextFile(filePaths[i]));
                }
            }
    
            // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来
            unionStream.map(new MapFunction<String, UserRecord>() {
                @Override
                public UserRecord map(String value) throws Exception {
                    return getRecord(value);
                }
            }).assignTimestampsAndWatermarks(
                    new Record2TimestampExtractor()
            ).filter(new FilterFunction<UserRecord>() {
                @Override
                public boolean filter(UserRecord value) throws Exception {
                    return value.sexy.equals("female");
                }
            }).keyBy(
                new UserRecordSelector()
            ).window(
                TumblingEventTimeWindows.of(Time.minutes(windowTime))
            ).reduce(new ReduceFunction<UserRecord>() {
                @Override
                public UserRecord reduce(UserRecord value1, UserRecord value2)
                        throws Exception {
                    value1.shoppingTime += value2.shoppingTime;
                    return value1;
                }
            }).filter(new FilterFunction<UserRecord>() {
                @Override
                public boolean filter(UserRecord value) throws Exception {
                    return value.shoppingTime > 120;
                }
            }).print();
    
            // 调用execute触发执行
            env.execute("FemaleInfoCollectionPrint java");
        }
    
        // 构造keyBy的关键字作为分组依据
        private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> {
            @Override
            public Tuple2<String, String> getKey(UserRecord value) throws Exception {
                return Tuple2.of(value.name, value.sexy);
            }
        }
    
        // 解析文本行数据,构造UserRecord数据结构
        private static UserRecord getRecord(String line) {
            String[] elems = line.split(",");
            assert elems.length == 3;
            return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
        }
    
        // UserRecord数据结构的定义,并重写了toString打印方法
        public static class UserRecord {
            private String name;
            private String sexy;
            private int shoppingTime;
    
            public UserRecord(String n, String s, int t) {
                name = n;
                sexy = s;
                shoppingTime = t;
            }
    
            public String toString() {
                return "name: " + name + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
            }
        }
    
        // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark
        private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> {
    
            // add tag in the data of datastream elements
            @Override
            public long extractTimestamp(UserRecord element, long previousTimestamp) {
                return System.currentTimeMillis();
            }
    
            // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
            @Override
            public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) {
                return new Watermark(extractedTimestamp - 1);
            }
        }
    }
    

编译并运行程序

  1. 在IntelliJ IDEA中,配置工程的Artifacts信息。

    1. 在IDEA主页面,选择File > Project Structures...进入“Project Structure”页面。
    2. “Project Structure”页面,选择“Artifacts”,单击“+”并选择JAR > Empty
      图2 添加Artifacts
    3. 根据实际情况设置Jar包的名称、类型以及输出路径,例如配置为“flink-demo”。
      图3 设置基本信息
    4. 选中'FlinkStreamJavaExample' compile output,右键选择“Put into Output Root”。然后单击“Apply”
      图4 Put into Output Root
    5. 最后单击“OK”完成配置。

  2. 生成Jar包。

    1. 在IDEA主页面,选择Build > Build Artifacts...
    2. 在弹出的菜单中,选择FlinkStreamJavaExample > Build开始生成Jar包。
      图5 Build
    3. Jar包编译成功后,可以从1.c中配置的路径下获取到“flink-demo.jar”文件。

  3. 安装并配置Flink客户端。

    1. 安装MRS集群客户端,例如客户端安装目录为“/opt/hadoopclient”。
    2. 准备集群配置文件中下载的认证凭据压缩包解压缩,并将得到的文件复制到客户端节点中,例如客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
    3. 执行以下命令编辑Flink客户端配置参数并保存。

      vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml

      将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“jobmanager.web.allow-access-address”配置项中,同时在对应配置添加keytab路径以及用户名。

      ...
      jobmanager.web.allow-access-address: 192.168.64.122,192.168.64.216,192.168.64.234
      ...
      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: flinkuser
      ...
    4. 配置安全认证。
      1. 执行以下命令,生成Flink客户端安全认证文件。

        cd /opt/hadoopclient/Flink/flink/bin

        sh generate_keystore.sh

        执行脚本后,需输入一个用于认证的自定义密码。

      2. 配置客户端访问“flink.keystore”和“flink.truststore”文件的路径配置。

        cd /opt/hadoopclient/Flink/flink/conf/

        mkdir ssl

        mv flink.keystore ssl/

        mv flink.truststore ssl/

        vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml

        修改如下两个参数的路径详细为相对地址路径。

        security.ssl.keystore: ssl/flink.keystore
        security.ssl.truststore: ssl/flink.truststore

  4. 2中生成的Jar包上传到Flink客户端节点相关目录下,例如上传至“/opt/hadoopclient”。

    在jar包所在目录下创建“conf”目录,将准备集群配置文件中获取的集群客户端配置文件软件包内“Flink/config”内的配置文件上传至“conf”目录。

  5. 将应用程序待处理的源数据文件上传至NodeManager实例所在节点。

    在本示例中,源数据文件“log1.txt”、“log2.txt”放置在本地,因此需提前上传至所有Yarn NodeManager实例的节点上的“/opt”目录下,且文件权限配置为755。

  6. 在Flink客户端下通过yarn session命令启动Flink集群。

    cd /opt/hadoopclient/Flink/flink

    bin/yarn-session.sh -jm 1024 -tm 1024 -t conf/ssl/

    ...
    Cluster started: Yarn cluster with application id application_1683438782910_0009
    JobManager Web Interface: http://192.168.64.10:32261

  7. Flink集群启动成功后,重新打开一个客户端连接窗口,进入Flink客户端目录运行程序。

    source /opt/hadoopclient/bigdata_env

    cd /opt/hadoopclient/Flink/flink

    bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/hadoopclient/flink-demo.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2

    ...
    2023-05-26 19:56:52,068 | INFO  | [main] | Found Web Interface host-192-168-64-10:32261 of application 'application_1683438782910_0009'. | org.apache.flink.yarn.YarnClusterDescriptor.setClusterEntrypointInfoToConfig(YarnClusterDescriptor.java:1854)
    Job has been submitted with JobID 7647255752b09456d5a580e33a8529f5
    Program execution finished
    Job with JobID 7647255752b09456d5a580e33a8529f5 has finished.
    Job Runtime: 36652 ms

  8. 查看运行结果。

    使用flinkuser用户登录FusionInsight Manager,选择“集群 > 服务 > Yarn”,进入Yarn ResourceManager WebUI页面,在“Applications”页面单击作业名称,进入到作业详情页面。

    图6 查看Yarn作业详情

    对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。

    图7 查看Flink作业详情

    在本示例程序中,单击“Task Managers”,在作业的“Stdout”页签中可查看程序运行结果。

    图8 查看程序运行结果