更新时间:2023-03-17 GMT+08:00

Job Pipeline

Flink开源增强特性:Job Pipeline

通常情况下,会将与某一方面业务相关的逻辑代码放在一个比较大的Jar包中,这种Jar包称为Fat Jar。Fat Jar具有以下缺点:
  • 随着业务逻辑越来越复杂,Jar包的大小也不断增加。
  • 协调难度增大,所有的业务开发人员都在同一套业务逻辑上开发,虽然可以将整个业务逻辑划分为几个模块,单各模块之间是一种紧耦合的关系,当需求更改时,需要重新规划整个流图。
拆分成多个作业目前还存在问题。
  • 通常情况下,作业之间可以通过Kafka实现数据传输,如作业A可以将数据发送到Kafka的Topic A下,然后作业B和作业C可以从Topic A下读取数据。该方案简单易行,但是延迟很难做到100ms以内。
  • 采用TCP直接相连的方式,算子在分布式环境下,可能会调度到任意节点,上下游之间无法感知其存在。

Job Pipeline流图结构

Pipeline是由Flink的多个Job通过TCP连接起来,上游Job可以直接向下游Job发送数据。这种发送数据的流图称为Job Pipeline,如图1所示。

图1 Job Pipeline流图

Job Pipeline原理介绍

图2 Job Pipeline原理图
  • NettySink和NettySource

    Pipeline中上下游Job是直接通过Netty进行通信,上游Job的Sink算子作为Server,下游Job的Source算子作为Client。上游Job的Sink算子命名为NettySink,下游Job的Source算子命名为NettySource。

  • NettyServer和NettyClient

    NettySink作为Netty的服务器端,内部NettyServer实现服务器功能;NettySource作为Netty的客户端,内部NettyClient实现客户端功能。

  • 发布者

    通过NettySink向下游Job发送数据的Job称为发布者。

  • 订阅者

    通过NettySource接收上游Job发送的数据的Job称为订阅者。

  • 注册服务器

    保存NettyServer的IP、端口以及NettySink的并发度信息的第三方存储器。

  • 总体架构是一个三层结构,由外到里依次是:
    • NettySink->NettyServer->NettyServerHandler
    • NettySource->NettyClient->NettyClientHandler

Job Pipeline功能介绍

  • NettySink

    NettySink由以下几个重要模块组成:

    • RichParallelSinkFunction

      NettySink继承了RichParallelSinkFunction,使其具有Sink算子的属性。主要通过RichParallelSinkFunction的接口来实现以下功能:

      • 启动NettySink算子。
      • 运行NettySink算子,从本job的上游算子接收数据。
      • 取消NettySink算子运行等。

      也可以通过其属性获取以下信息:

      • NettySink算子各个并发度的subtaskIndex信息。
      • NettySink算子的并发度是多少。
    • RegisterServerHandler

      该组件主要是与注册服务器交互的部件,在平台定义了一系列接口,包括以下几种接口:

      • “start();” :启动RegisterServerHandler,与第三方RegisterServer建立联系。
      • “createTopicNode();” :创建Topic节点。
      • “register();”: 将IP、端口及并发度信息注册到Topic节点下。
      • “deleteTopicNode();”: 删除Topic节点。
      • “unregister();”: 删除注册信息。
      • “query(); ”:查询注册信息。
      • “isExist();”: 查找某个信息是否存在。
      • “shutdown(); ”:关闭RegisterServerHandler,与第三方RegisterServer断开连接。
      • RegisterServerHandler接口实现了ZooKeeper作为RegisterServer的Handler,用户可以根据自己的需求,实现自己的Handler,ZooKeeper中信息的保存形式如下图所示:
        Namespace    
        |---Topic-1          
          |---parallel-1          
          |---parallel-2          
          |....          
          |---parallel-n    
        |---Topic-2          
          |---parallel-1          
          |---parallel-2          
          |....          
          |---parallel-m     
        |... 
      • Namespace的信息通过“flink-conf.yaml”的以下配置项获取:
        nettyconnector.registerserver.topic.storage: /flink/nettyconnector
      • ZookeeperRegisterServerHandler与ZooKeeper之间的SASL认证通过Flink的框架实现。
      • 用户必须自己保证每个Job有一个唯一的TOPIC,否则会引起作业间订阅关系的混乱。
      • 在ZookeeperRegisterServerHandler调用shutdown()时,首先删除本并发度的注册信息,然后尝试删除TOPIC节点,如果TOPIC节点为非空,则放弃删除TOPIC节点,说明其他并发度还未退出。
    • NettyServer

      该模块是NettySink算子的核心之一,主要作用是创建一个NettyServer并接收NettyClient的连接申请。将同一Job中上游算子发送过来的数据,经由NettyServerHandler发送出去。 另外,NettyServer的端口及子网需要在“flink-conf.yaml”配置文件中配置:

      • 端口范围
        nettyconnector.sinkserver.port.range: 28444-28943
      • 子网
        nettyconnector.sinkserver.subnet: 10.162.222.123/24 

        nettyconnector.sinkserver.subnet默认配置为Flink客户端所在节点子网,若客户端与TaskManager不在同一个子网则有可能导致错误,需手动配置为TaskManager所在网络子网(业务IP)。

    • NettyServerHandler

      该Handler是NettySink与订阅者交互的通道,当NettySink接收到消息时,该Handler负责将消息发送出去。为保证数据传输的安全性,该通道通过SSL加密。另外设置一个Netty Connector的功能开关,只有当Flink的SSL总开关被打开以及配置“nettyconnector.ssl.enabled”“true”的时候才开启SSL加密,否则不开启。

  • NettySource

    NettySource由以下几个重要模块组成:

    • RichParallelSourceFunction

      NettySource继承了RichParallelSinkFunction,使其具有Source算子的属性,主要通过RichParallelSourceFunction接口来实现以下功能:

      • 启动NettySink算子。
      • 运行NettySink算子,接收来自订阅者的数据并注入到所在Job中。
      • 取消Source算子运行等。

      也可以通过其属性获取以下信息:

      • NettySource算子各个并发度的subtaskIndex信息。
      • NettySource算子的并发度是多少。

      当NettySource算子进入run阶段后,平台内部会不断监控其NettyClient状态是否健康,一旦发现其出现异常,即会重启NettyClient,重新与NettyServer建立连接并接收数据,以防接收的数据混乱。

    • RegisterServerHandler

      该组件与NettySink的RegisterServerHandler功能相同,在NettySource算子中仅获取所订阅Job的各个并发算子的IP、端口及并发算子信息。

    • NettyClient

      NettyClient与NettyServer建立连接,并通过NettyClientHandler接收数据。每个NettySource算子必须具有唯一的name(由用户来保障)。NettyServer通过唯一的name确定每个Client来自不同的NettySource。当NettyClient与NettyServer建立连接时,首先向NettyServer注册NettyClient,将NettyClient的NettySource name传递给NettyServer。

    • NettyClientHandler

      该模块是与发布者交互的通道,也是与Job的其他算子交互的通道。当该通道中接收到消息时,该Handler负责将消息注入到Job内部。另外,为保证数据安全传输,该通道通过SSL加密,与NettySink进行通信。另外设置一个NettyConnector的功能开关,只有当Flink的SSL总开关被打开以及“nettyconnector.ssl.enabled”“true”的时候才开启SSL加密,否则不开启。

Job与Job之间的联系可能是多对多的关系,对于每个NettySink和NettySource算子的并发度而言,是一对多的关系,如图3所示。
图3 关系图