更新时间:2024-10-28 GMT+08:00
分享

自定义Flink Streaming作业

获取DIS Flink Connector Demo

  1. 这里获取“dis-flink-connector-X.X.X.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录:

    • “huaweicloud-dis-flink-connector-demo”目录包含一个Maven工程样例。

Intellij IDEA中导入Demo工程

以IntelliJ IDEA社区版为例,说明如何编写Flink作业。请先确保在IDEA上已经正确配置好。

  • JDK 1.8+
  • Scala-sdk-2.11
  • Maven 3.3.*
  1. 打开IntelliJIDEA,选择File > New > > Project from Existing Sources...。选择解压至本地的huaweicloud-dis-flink-connector-demo目录,单击确认。

  2. 选择导入Maven工程,保持默认配置,一直单击下一步即可。

  3. 单击“New Window”,在新窗口打开此工程。

  4. 在pom.xml上单击右键,选择Maven > Reimport,重新引入maven依赖库。

验证Flink Streaming Source作业

实际场景中,Flink Streaming作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集(如MRS集群)并提交作业验证。

  1. 使用注册账户登录DIS控制台
  2. 单击管理控制台左上角的,选择区域和项目。
  3. 参考步骤1:开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上传的内容为hello world。
  4. 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。

  5. 右键单击pom.xml,选择Maven > Reimport,重新引入依赖包。

  6. 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSourceJavaExample'”

  7. 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

    DIS网关地址 Region名称 AK SK ProjectID 通道名称 起始位置 消费者标识
     
    如在华北-北京1测试,则参数示例为
    https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest GROUP_ID

    参数顺序与含义在示例代码中有,可以参考。

      // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com
            String endpoint;
            // DIS服务所在区域ID,如 cn-north-1
            String region;
            // 
            // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,或者使用交互方式式传参,确保安全;  
            // 本示例以交互式方式。
            System.out.print("Enter your Access Key: ");       
            String ak = scanner.nextLine();
            System.out.print("Enter your Secret Key: ");      
            String sk = scanner.nextLine(); 
            // 用户的项目ID
            String projectId;
            // DIS通道名称
            String streamName;
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费
            // 取值有: LATEST      从最新的数据开始消费,此策略会忽略通道中已有数据
            //         EARLIEST    从最老的数据开始消费,此策略会获取通道中所有的有效数据
            String startingOffsets;
            // 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道
            String groupId;
    • 断点消费必须指定checkpoint或者按照如下设置自动打上消费点 。

    disConfig.put(DisConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

    disConfig.put(DisConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

    disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "LATEST"); //LATEST 表示从最新的数据开始消费 。

    • 如果都不设置 默认的是从lastest开始消费。

    最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

  8. 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSourceJavaExample'”,即可启动作业。

  9. 如果没有其他错误,将从DIS读取数据并输出到控制台,示例如下:

    2> hello world
    2> hello world
    2> hello world

  10. 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。

验证Flink Streaming Sink作业

实际场景中,Flink作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群)并提交作业验证。

  1. 使用注册账户登录DIS控制台
  2. 单击管理控制台左上角的,选择区域和项目。
  3. 参考步骤1:开通DIS通道申请开通DIS通道。
  4. 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。

  5. 右键单击pom.xml,选择Maven > Reimport,重新引入依赖包。

  6. 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSinkJavaExample'”

  7. 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

    DIS网关地址 Region名称 AK SK ProjectID 通道名称
    https://dis.${region}.mycloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME

    参数顺序与含义在示例代码中有,可以参考。

     // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com
            String endpoint;
            // DIS服务所在区域ID,如 cn-north-1
            String region;
            
            // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,或者使用交互方式式传参,确保安全;  
            // 本示例以交互式方式。
            System.out.print("Enter your Access Key: ");       
            String ak = scanner.nextLine();
    
            System.out.print("Enter your Secret Key: ");              
            String sk = scanner.nextLine();
            // 用户的项目ID
            String projectId;
            // DIS通道名称
            String streamName;

    最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

  8. 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSinkJavaExample'”,即可启动作业。

  9. 如果没有其他错误,可以到DIS控制台通道监控页面查看数据是否上传成功。
  10. 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。

相关文档