Updated on 2022-08-16 GMT+08:00

Scenario

Scenario

Assume that there are three jobs, a publisher and two subscribers. The publisher generates 10000 pieces of data each second. Data is sent by the NettySink operator from the publisher job to downstream subscribers which subscribe a same piece of data.

Data Planning

  1. The publisher uses customized operators to generate about 10000 pieces of data per second.
  2. The data contains two attributes, which are of integer and string types.
  3. Configuration file
    • nettyconnector.registerserver.topic.storage: (Mandatory) Configures the path (on a third-party server) to information about IP address, port numbers, and concurrency of NettySink. For example:
      nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range: (Mandatory) Configures the range of port numbers of NettySink. For example:
      nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.ssl.enabled: Configures whether to enable SSL encryption between NettySink and NettySource. The default value is false. For example:
      nettyconnector.ssl.enabled: true
    • nettyconnector.sinkserver.subnet: Configure the network domain. For example:
      nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. Security authentication configuration:
    • SASL authentication of ZooKeeper depends on HA-related configurations in flink-conf.yaml. For details, see section Flink Configuration Management.
    • SSL configurations such as keystore, truststore, keystore password, truststore password, and password inherit from flink-conf.yaml. For details, see Encrypted Transmission.
  5. Description of APIs
    • RegisterServer API

      RegisterServerHandler stores information such as IP address, port number, and concurrency of NettySink for the connection with NettySource. Following APIs are provided for users:

      public interface RegisterServerHandler {
       
      /**
         * Start the RegisterServer
      
          * @param The configuration indicates the configuration type of Flink.
      
         */
      void start(Configuration configuration) throws Exception;
      /**
              *Create a topic node (directory) on the RegisterServer
      )
              * @param The topic indicates the name of the topic node
      
              */
      void createTopicNode(String topic) throw Exception;
      /**
      *Register the information to a topic node (directory)
      
      * @param topic @param The topic indicates the directory to be registered with
      
      * @param The registerRecord indicates the information to be registered
      
      */
      void register(String topic, RegisterRecord registerRecord) throws Exception;
      /**
             *Delete the topic node.
             * @param The topic indicates the topic to be deleted
             */
          void deleteTopicNode(String topic) throws Exception;
      /**
         *Deregister the registration inDeregister the registration information formation
       *@param The topic indicates the topic where the registration information locates.
         *@param The recordId indicates the ID of the registration information to be deregistered
      
         */
      void unregister(String topic, int recordId) throws Exception;
      /**
          * Query information
      *@param Topic where the query information locates
      *@param The recordId indicates the ID of the query information
      
      */
      RegisterRecord query(String topic, int recordId) throws Exception;
      /**
          * Query whether a topic exist.
          * @param topic
          */
      Boolean isExist(String topic) throws Exception;
      /**
          *Disable RegisterServerHandler.
         */
      void shutdown() throws Exception;

      In addition to the preceding APIs, Flink provides ZookeeperRegisterHandler.

    • NettySink operator
      Class NettySink(String name, 
      String topic, 
      RegisterServerHandler registerServerHandler,
      int numberOfSubscribedJobs)
      • name: Name of a current NettySink.
      • topic: The topic that generates data for the current NettySink. Different NettySinks must use different topics. Otherwise, the subscription may be disordered and data transmission may be abnormal.
      • registerServerHandler: Handler of the registration server.
      • numberOfSubscribedJobs: Specific number of jobs that subscribe the current NettySink. NettySink sends data only when all subscribers are connected to NettySink.
    • NettySource operator
      Class  NettySource(String name,
      String topic,
      RegisterServerHandler registerServerHandler)
      • name: name of the NettySource. The NettySource must be unique (concurrency excluded). Otherwise, connection with NettySink may be conflicted.
      • topic: topic of subscribed NettySink.
      • registerServerHandler: Handler of the registration server.

The concurrency of NettySource and the concurrency NettySink must be the same. Otherwise, the connection cannot be created.

Development Approach

1. There are three jobs, on publisher and two subscribers.

2. The publisher transforms generated data into byte[] and sends them the subscribers.

3. After receiving the byte[], subscribers transform data into the string type and print sampled data.