自定义Flink Streaming作业
获取DIS Flink Connector Demo
- 这里获取“dis-flink-connector-X.X.X.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录:
- “huaweicloud-dis-flink-connector-demo”目录包含一个Maven工程样例。
Intellij IDEA中导入Demo工程
以IntelliJ IDEA社区版为例,说明如何编写Flink作业。请先确保在IDEA上已经正确配置好。
- JDK 1.8+
- Scala-sdk-2.11
- Maven 3.3.*
- 打开IntelliJIDEA,选择
。选择解压至本地的huaweicloud-dis-flink-connector-demo目录,单击确认。
- 选择导入Maven工程,保持默认配置,一直单击下一步即可。
- 单击“New Window”,在新窗口打开此工程。
- 在pom.xml上单击右键,选择
,重新引入maven依赖库。
验证Flink Streaming Source作业
实际场景中,Flink Streaming作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集(如MRS集群)并提交作业验证。
- 使用注册账户登录DIS控制台。
- 单击管理控制台左上角的,选择区域和项目。
- 参考步骤1:开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上传的内容为hello world。
- 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。
- 右键单击pom.xml,选择
,重新引入依赖包。
- 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSourceJavaExample'”。
- 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :
DIS网关地址 Region名称 AK SK ProjectID 通道名称 起始位置 消费者标识 如在华北-北京1测试,则参数示例为 https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest GROUP_ID
参数顺序与含义在示例代码中有,可以参考。
// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint; // DIS服务所在区域ID,如 cn-north-1 String region; // // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,或者使用交互方式式传参,确保安全; // 本示例以交互式方式。 System.out.print("Enter your Access Key: "); String ak = scanner.nextLine(); System.out.print("Enter your Secret Key: "); String sk = scanner.nextLine(); // 用户的项目ID String projectId; // DIS通道名称 String streamName; // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 // 取值有: LATEST 从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST 从最老的数据开始消费,此策略会获取通道中所有的有效数据 String startingOffsets; // 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道 String groupId;
- 断点消费必须指定checkpoint或者按照如下设置自动打上消费点 。
disConfig.put(DisConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
disConfig.put(DisConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "LATEST"); //LATEST 表示从最新的数据开始消费 。
- 如果都不设置 默认的是从lastest开始消费。
最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。
- 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSourceJavaExample'”,即可启动作业。
- 如果没有其他错误,将从DIS读取数据并输出到控制台,示例如下:
2> hello world 2> hello world 2> hello world
- 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。
验证Flink Streaming Sink作业
实际场景中,Flink作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群)并提交作业验证。
- 使用注册账户登录DIS控制台。
- 单击管理控制台左上角的,选择区域和项目。
- 参考步骤1:开通DIS通道申请开通DIS通道。
- 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。
- 右键单击pom.xml,选择
,重新引入依赖包。
- 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSinkJavaExample'”。
- 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :
DIS网关地址 Region名称 AK SK ProjectID 通道名称 https://dis.${region}.mycloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME
参数顺序与含义在示例代码中有,可以参考。
// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint; // DIS服务所在区域ID,如 cn-north-1 String region; // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,或者使用交互方式式传参,确保安全; // 本示例以交互式方式。 System.out.print("Enter your Access Key: "); String ak = scanner.nextLine(); System.out.print("Enter your Secret Key: "); String sk = scanner.nextLine(); // 用户的项目ID String projectId; // DIS通道名称 String streamName;
最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。
- 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSinkJavaExample'”,即可启动作业。
- 如果没有其他错误,可以到DIS控制台通道监控页面查看数据是否上传成功。
- 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把flink依赖也打进来),然后停止数据上传程序。