更新时间:2022-09-08 GMT+08:00
分享

场景说明

场景说明

本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。

数据规划

  1. 发布者Job使用自定义算子每秒钟产生10000条数据。
  2. 数据包含两个属性:分别是Int和String类型。
  3. 配置文件
    • nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
      nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:
      nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如:
      nettyconnector.ssl.enabled: true
    • nettyconnector.sinkserver.subnet:设置网络所属域,例如:
      nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. 安全认证配置:
    • Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink
    • SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见加密传输
  5. 接口说明
    • 注册服务器接口

      注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:

      public interface RegisterServerHandler {
       
      /**
         * 启动注册服务器
          * @param configuration Flink的Configuration类型
         */
      void start(Configuration configuration) throws Exception;
      /**
              *注册服务器上创建Topic节点(目录)
              * @param topic topic节点名称
              */
      void createTopicNode(String topic) throw Exception;
      /**
      *将信息注册到某个topic节点(目录)下
      * @param topic 需要注册到的目录
      * @param registerRecord 需要注册的信息
      */
      void register(String topic, RegisterRecord registerRecord) throws Exception;
      /**
             *删除topic节点
             * @param topic 待删除topic
             */
          void deleteTopicNode(String topic) throws Exception;
      /**
         *注销注册信息
         *@param topic 注册信息所在的topic
         *@param recordId 待注销注册信息ID
         */
      void unregister(String topic, int recordId) throws Exception;
      /**
          * 查寻信息
      * @param 查询信息所在的topic
      *@recordId 查询信息的ID
      */
      RegisterRecord query(String topic, int recordId) throws Exception;
      /**
          * 查询某个Topic是否存在
          * @param topic
          */
      Boolean isExist(String topic) throws Exception;
      /**
          *关闭注册服务器句柄
         */
      void shutdown() throws Exception;

      工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。

    • NettySink算子
      Class NettySink(String name, 
      String topic, 
      RegisterServerHandler registerServerHandler,
      int numberOfSubscribedJobs)
      • name:为本NettySink的名称。
      • topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
      • registerServerHandler:为注册服务器的句柄。
      • numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
    • NettySource算子
      Class  NettySource(String name,
      String topic,
      RegisterServerHandler registerServerHandler)
      • name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
      • topic:订阅的NettySink的topic。
      • registerServerHandler:为注册服务器的句柄。

NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。

开发思路

1. 一个Job作为发布者Job,其余两个作为订阅者Job。

2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送。

3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出。

分享:

    相关文档

    相关产品