文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(安全模式)/
开发Flink应用/
Flink Job Pipeline样例程序/
Flink Job Pipeline样例程序开发思路
更新时间:2024-08-03 GMT+08:00
Flink Job Pipeline样例程序开发思路
场景说明
本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。
数据规划
- 发布者Job使用自定义算子每秒钟产生10000条数据。
- 数据包含两个属性:分别是Int和String类型。
- 配置文件
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
nettyconnector.registerserver.topic.storage: /flink/nettyconnector
- nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:
nettyconnector.sinkserver.port.range: 28444-28943
- nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如:
nettyconnector.ssl.enabled: true
- nettyconnector.sinkserver.subnet:设置网络所属域,例如:
nettyconnector.sinkserver.subnet: 10.162.0.0/16
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
- 安全认证配置:
- Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink。
- SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见配置Flink应用安全认证。
- 接口说明
- 注册服务器接口
注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:
public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception;
工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
- NettySink算子
Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs)
- name:为本NettySink的名称。
- topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
- registerServerHandler:为注册服务器的句柄。
- numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
- NettySource算子
Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler)
- name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
- topic:订阅的NettySink的topic。
- registerServerHandler:为注册服务器的句柄。
- 注册服务器接口
NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。
开发思路
1. 一个Job作为发布者Job,其余两个作为订阅者Job。
2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送。
3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出。