Distributed Coordination (via Akka)
配置场景
Flink客户端与JobManager的通信,JobManager与TaskManager的通信和TaskManager与TaskManager的通信都基于Akka actor模型。Flink提供Akka连接参数的配置项,配置项请在“flink-conf.yaml”配置文件中进行配置,用户可以根据网络环境或调优策略再进行配置。
配置描述
配置项包括消息发送和等待的超时设置,akka监听机制Deathwatch的相关配置等。
针对MRS 3.x之前版本,参数说明见表1。
参数 |
是否必选 |
默认值 |
描述 |
---|---|---|---|
akka.ask.timeout |
否 |
10 s |
akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。 |
akka.lookup.timeout |
否 |
10 s |
查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。 |
akka.framesize |
否 |
10485760b |
JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。 |
akka.watch.heartbeat.interval |
否 |
10 s |
Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
akka.watch.heartbeat.pause |
否 |
60 s |
Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
akka.watch.threshold |
否 |
12 |
DeathWatch失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
akka.tcp.timeout |
否 |
20 s |
发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。 |
akka.throughput |
否 |
15 |
Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。 |
akka.log.lifecycle.events |
否 |
false |
Akka远程时间日志开关,当需要调试时可打开此开关。 |
akka.startup-timeout |
否 |
默认与akka.ask.timeout的值一致 |
Akka启动remote组件的超时时间。单位:ms/s/m/h/d。 |
akka.ssl.enabled |
是 |
true |
Akka通信SSL开关,仅在全局开关security.ssl开启时有。 |
针对MRS 3.x及之后版本,参数说明见表2。
参数 |
描述 |
默认值 |
是否必选配置 |
---|---|---|---|
akka.ask.timeout |
akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。 |
10s |
否 |
akka.lookup.timeout |
查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。 |
10s |
否 |
akka.framesize |
JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。 |
10485760b |
否 |
akka.watch.heartbeat.interval |
Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
10s |
否 |
akka.watch.heartbeat.pause |
Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
60s |
否 |
akka.watch.threshold |
DeathWath失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。
说明:
DeathWatch的详细解释可以参考akka官网:http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector。 |
12 |
否 |
akka.tcp.timeout |
发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。 |
20s |
否 |
akka.throughput |
Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。 |
15 |
否 |
akka.log.lifecycle.events |
Akka远程时间日志开关,当需要调试时可打开此开关。 |
false |
否 |
akka.startup-timeout |
远程组件启动失败前的超时时间。该值需带一个时间单位(ms/s/min/h/d) |
默认与akka.ask.timeout的值一致 |
否 |
akka.ssl.enabled |
Akka通信SSL开关,仅在全局开关security.ssl开启时有。 |
true |
是 |
akka.client-socket-worker-pool.pool-size-factor |
计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 |
1.0 |
否 |
akka.client-socket-worker-pool.pool-size-max |
基于因子计算的线程数上限。 |
2 |
否 |
akka.client-socket-worker-pool.pool-size-min |
基于因子计算的线程数下限。 |
1 |
否 |
akka.client.timeout |
【说明】客户端超时时间。该值需带一个时间单位(ms/s/min/h/d)。 |
60s |
否 |
akka.server-socket-worker-pool.pool-size-factor |
【说明】计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 |
1.0 |
否 |
akka.server-socket-worker-pool.pool-size-max |
基于因子计算的线程数上限。 |
2 |
否 |
akka.server-socket-worker-pool.pool-size-min |
基于因子计算的线程数下限。 |
1 |
否 |