更新时间:2023-03-17 GMT+08:00

Distributed Coordination (via Akka)

配置场景

Flink客户端与JobManager的通信,JobManager与TaskManager的通信和TaskManager与TaskManager的通信都基于Akka actor模型。Flink提供Akka连接参数的配置项,配置项请在“flink-conf.yaml”配置文件中进行配置,用户可以根据网络环境或调优策略再进行配置。

配置描述

配置项包括消息发送和等待的超时设置,akka监听机制Deathwatch的相关配置等。

针对MRS 3.x之前版本,参数说明见表1

表1 参数说明

参数

是否必选

默认值

描述

akka.ask.timeout

10 s

akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。

akka.lookup.timeout

10 s

查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。

akka.framesize

10485760b

JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。

akka.watch.heartbeat.interval

10 s

Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

akka.watch.heartbeat.pause

60 s

Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

akka.watch.threshold

12

DeathWatch失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

akka.tcp.timeout

20 s

发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。

akka.throughput

15

Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。

akka.log.lifecycle.events

false

Akka远程时间日志开关,当需要调试时可打开此开关。

akka.startup-timeout

默认与akka.ask.timeout的值一致

Akka启动remote组件的超时时间。单位:ms/s/m/h/d。

akka.ssl.enabled

true

Akka通信SSL开关,仅在全局开关security.ssl开启时有。

针对MRS 3.x及之后版本,参数说明见表2

表2 参数说明

参数

描述

默认值

是否必选配置

akka.ask.timeout

akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。

10s

akka.lookup.timeout

查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。

10s

akka.framesize

JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。

10485760b

akka.watch.heartbeat.interval

Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

10s

akka.watch.heartbeat.pause

Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

60s

akka.watch.threshold

DeathWath失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。

说明:

DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector

12

akka.tcp.timeout

发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。

20s

akka.throughput

Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。

15

akka.log.lifecycle.events

Akka远程时间日志开关,当需要调试时可打开此开关。

false

akka.startup-timeout

远程组件启动失败前的超时时间。该值需带一个时间单位(ms/s/min/h/d)

默认与akka.ask.timeout的值一致

akka.ssl.enabled

Akka通信SSL开关,仅在全局开关security.ssl开启时有。

true

akka.client-socket-worker-pool.pool-size-factor

计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。

1.0

akka.client-socket-worker-pool.pool-size-max

基于因子计算的线程数上限。

2

akka.client-socket-worker-pool.pool-size-min

基于因子计算的线程数下限。

1

akka.client.timeout

【说明】客户端超时时间。该值需带一个时间单位(ms/s/min/h/d)。

60s

akka.server-socket-worker-pool.pool-size-factor

【说明】计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。

1.0

akka.server-socket-worker-pool.pool-size-max

基于因子计算的线程数上限。

2

akka.server-socket-worker-pool.pool-size-min

基于因子计算的线程数下限。

1