文档首页 > > 用户指南> 使用DIS> 使用DIS Flink Connector上传与下载数据> 自定义Flink Streaming作业

自定义Flink Streaming作业

分享
更新时间: 2019/11/28 GMT+08:00

获取DIS Flink Connector Demo

  1. https://dis-publish.obs-website.cn-north-1.myhwclouds.com/获取“dis-flink-connector-X.X.X.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录:

    • “huaweicloud-dis-flink-connector-demo”目录包含一个Maven工程样例。

Intellij IDEA中导入Demo工程

以IntelliJ IDEA社区版为例,说明如何编写Flink作业。请先确保在IDEA上已经正确配置好。

  • JDK 1.8+
  • Scala-sdk-2.11
  • Maven 3.3.*
  1. 打开IntelliJIDEA,选择File > New > > Project from Existing Sources...。选择解压至本地的huaweicloud-dis-flink-connector-demo目录,单击确认。

  2. 选择导入Maven工程,保持默认配置,一直单击下一步即可。

  3. 单击“New Window”,在新窗口打开此工程。

  4. 在pom.xml上单击右键,选择Maven > Reimport,重新引入maven依赖库。

验证Flink Streaming Source作业

实际场景中,Flink Streaming作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群或者CS集群)并提交作业验证。

  1. 使用注册帐户登录DIS控制台
  2. 单击管理控制台左上角的,选择区域和项目。
  3. 参考步骤1:开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上传的内容为hello world。
  4. 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。

  5. 右键单击pom.xml,选择Maven > Reimport,重新引入依赖包。

  6. 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSourceJavaExample'”

  7. 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

    DIS网关地址 Region名称 AK SK ProjectID 通道名称 起始位置 消费者标识
     
    如在华北-北京1测试,则参数示例为
    https://dis.${region}.myhwclouds.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest GROUP_ID

    参数顺序与含义在示例代码中有,可以参考。

      // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com
            String endpoint;
            // DIS服务所在区域ID,如 cn-north-1
            String region;
            // 用户的AK
            String ak;
            // 用户的SK
            String sk;
            // 用户的项目ID
            String projectId;
            // DIS通道名称
            String streamName;
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费
            // 取值有: LATEST      从最新的数据开始消费,此策略会忽略通道中已有数据
            //         EARLIEST    从最老的数据开始消费,此策略会获取通道中所有的有效数据
            String startingOffsets;
            // 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道
            String groupId;

    最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

  8. 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSourceJavaExample'”,即可启动作业。

  9. 如果没有其他错误,将从DIS读取数据并输出到控制台,示例如下:

    2> hello world
    2> hello world
    2> hello world

  10. 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。

验证Flink Streaming Sink作业

实际场景中,Flink作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群或者CS集群)并提交作业验证。

  1. 使用注册帐户登录DIS控制台
  2. 单击管理控制台左上角的,选择区域和项目。
  3. 参考步骤1:开通DIS通道申请开通DIS通道。
  4. 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。

  5. 右键单击pom.xml,选择Maven > Reimport,重新引入依赖包。

  6. 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSinkJavaExample'”

  7. 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

    DIS网关地址 Region名称 AK SK ProjectID 通道名称
    https://dis.${region}.myhwclouds.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME

    参数顺序与含义在示例代码中有,可以参考。

     // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com
            String endpoint;
            // DIS服务所在区域ID,如 cn-north-1
            String region;
            // 用户的AK
            String ak;
            // 用户的SK
            String sk;
            // 用户的项目ID
            String projectId;
            // DIS通道名称
            String streamName;

    最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

  8. 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSinkJavaExample'”,即可启动作业。

  9. 如果没有其他错误,可以到DIS控制台通道监控页面查看数据是否上传成功。
  10. 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区