Flink Job Pipeline增强
通常情况下,开发者会将与某一方面业务相关的逻辑代码放在一个比较大的Jar包中,这种Jar包称为Fat Jar。
- 随着业务逻辑越来越复杂,Jar包的大小也不断增加。
- 协调难度增大,所有的业务开发人员都在同一套业务逻辑上开发,虽然可以将整个业务逻辑划分为几个模块,但各模块之间是一种紧耦合的关系,当需求更改时,需要重新规划整个流图。
- 通常情况下,作业之间可以通过Kafka实现数据传输,如作业A可以将数据发送到Kafka的Topic A下,然后作业B和作业C可以从Topic A下读取数据。该方案简单易行,但是延迟一般大于100ms。
- 采用TCP直接相连的方式,算子在分布式环境下,可能会调度到任意节点,上下游之间无法感知其存在。
Job Pipeline流图结构
Pipeline是由Flink的多个Job通过TCP连接起来,上游Job可以直接向下游Job发送数据。这种发送数据的流图称为Job Pipeline,如图1所示。
Job Pipeline原理介绍
- NettySink和NettySource
Pipeline中上下游Job是直接通过Netty进行通信,上游Job的Sink算子作为Server,下游Job的Source算子作为Client。上游Job的Sink算子命名为NettySink,下游Job的Source算子命名为NettySource。
- NettyServer和NettyClient
NettySink作为Netty的服务器端,内部NettyServer实现服务器功能;NettySource作为Netty的客户端,内部NettyClient实现客户端功能。
- 发布者
- 订阅者
- 注册服务器
- 总体架构是一个三层结构,由外到里依次是:
- NettySink->NettyServer->NettyServerHandler
- NettySource->NettyClient->NettyClientHandler
Job Pipeline功能介绍
- NettySink
NettySink由以下几个重要模块组成:
- RichParallelSinkFunction
NettySink继承了RichParallelSinkFunction,使其具有Sink算子的属性。主要通过RichParallelSinkFunction的接口来实现以下功能:
- 启动NettySink算子。
- 运行NettySink算子,从本Job的上游算子接收数据。
- 取消NettySink算子运行等。
也可以通过其属性获取以下信息:
- NettySink算子各个并发度的subtaskIndex信息。
- NettySink算子的并发度。
- RegisterServerHandler
该组件主要是与注册服务器交互的部件,在平台上定义了一系列接口,包括以下几种接口:
- “start();” :启动RegisterServerHandler,与第三方RegisterServer建立联系。
- “createTopicNode();” :创建Topic节点。
- “register();”: 将IP、端口及并发度信息注册到Topic节点下。
- “deleteTopicNode();”: 删除Topic节点。
- “unregister();”: 删除注册信息。
- “query(); ”:查询注册信息。
- “isExist();”: 查找某个信息是否存在。
- “shutdown(); ”:关闭RegisterServerHandler,与第三方RegisterServer断开连接。
- RegisterServerHandler接口实现了ZooKeeper作为RegisterServer的Handler,用户可以根据自己的需求,实现自己的Handler,ZooKeeper中信息的保存形式如下图所示:
Namespace |---Topic-1 |---parallel-1 |---parallel-2 |.... |---parallel-n |---Topic-2 |---parallel-1 |---parallel-2 |.... |---parallel-m |...
- Namespace的信息通过“flink-conf.yaml”的以下配置项获取:
nettyconnector.registerserver.topic.storage: /flink/nettyconnector
- ZookeeperRegisterServerHandler与ZooKeeper之间的SASL认证通过Flink的框架实现。
- 用户必须自己保证每个Job有一个唯一的TOPIC,否则会引起作业间订阅关系的混乱。
- 在ZookeeperRegisterServerHandler调用shutdown()时,首先删除本并发度的注册信息,然后尝试删除TOPIC节点,如果TOPIC节点为非空,则放弃删除TOPIC节点,说明其他并发度还未退出。
- NettyServer
该模块是NettySink算子的核心之一,主要作用是创建一个NettyServer并接收NettyClient的连接申请。将同一Job中上游算子发送过来的数据,经由NettyServerHandler发送出去。 另外,NettyServer的端口及子网需要在“flink-conf.yaml”配置文件中配置:
- 端口范围
nettyconnector.sinkserver.port.range: 28444-28943
- 子网
nettyconnector.sinkserver.subnet: 10.162.222.123/24
nettyconnector.sinkserver.subnet默认配置为Flink客户端所在节点子网,若客户端与TaskManager不在同一个子网则有可能导致错误,需手动配置为TaskManager所在网络子网(业务IP)。
- 端口范围
- NettyServerHandler
该Handler是NettySink与订阅者交互的通道,当NettySink接收到消息时,该Handler负责将消息发送出去。为保证数据传输的安全性,该通道通过SSL加密。另外设置一个Netty Connector的功能开关,只有当Flink的SSL总开关被打开以及配置“nettyconnector.ssl.enabled”为“true”的时候才开启SSL加密,否则不开启。
- RichParallelSinkFunction
- NettySource
NettySource由以下几个重要模块组成:
- RichParallelSourceFunction
NettySource继承了RichParallelSinkFunction,使其具有Source算子的属性,主要通过RichParallelSourceFunction接口来实现以下功能:
- 启动NettySink算子。
- 运行NettySink算子,接收来自订阅者的数据并注入到所在Job中。
- 取消Source算子运行等。
也可以通过其属性获取以下信息:
- NettySource算子各个并发度的subtaskIndex信息。
- NettySource算子的并发度。
当NettySource算子进入run阶段后,平台内部会不断监控其NettyClient状态是否健康,一旦发现其出现异常,即会重启NettyClient,重新与NettyServer建立连接并接收数据,以防接收的数据混乱。
- RegisterServerHandler
该组件与NettySink的RegisterServerHandler功能相同,在NettySource算子中仅获取所订阅Job的各个并发算子的IP、端口及并发算子信息。
- NettyClient
NettyClient与NettyServer建立连接,并通过NettyClientHandler接收数据。每个NettySource算子必须具有唯一的name(由用户来保障)。NettyServer通过唯一的name确定每个Client来自不同的NettySource。当NettyClient与NettyServer建立连接时,首先向NettyServer注册NettyClient,将NettyClient的NettySource name传递给NettyServer。
- NettyClientHandler
该模块是与发布者交互的通道,也是与Job的其他算子交互的通道。当该通道中接收到消息时,该Handler负责将消息注入到Job内部。另外,为保证数据安全传输,该通道通过SSL加密,与NettySink进行通信。另外设置一个NettyConnector的功能开关,只有当Flink的SSL总开关被打开以及“nettyconnector.ssl.enabled”为“true”的时候才开启SSL加密,否则不开启。
- RichParallelSourceFunction