Updated on 2022-08-12 GMT+08:00

Distributed Coordination (via Akka)

Scenarios

The Akka actor model is the basis of communications between the Flink client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager. Flink enables you to configure the Akka connection parameters in the flink-conf.yaml file based on the network environment or optimization policy.

Configuration Description

You can configure timeout settings of message sending and waiting, and the Akka listening mechanism Deathwatch.

For versions earlier than MRS 3.x, see Table 1.

Table 1 Parameters

Parameter

Mandatory

Default Value

Description

akka.ask.timeout

No

10 s

Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d.

akka.lookup.timeout

No

10 s

Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d.

akka.framesize

No

10485760b

Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB.

akka.watch.heartbeat.interval

No

10 s

Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d.

NOTE:

For detailed description of Akka DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.watch.heartbeat.pause

No

60 s

Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d.

NOTE:

For detailed description of Akka DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.watch.threshold

No

12

DeathWatch failure detection threshold. A small value is prone to mark normal TaskManager as failed and a large value increases failure detection time.

NOTE:

For detailed description of Akka DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.tcp.timeout

No

20 s

Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d.

akka.throughput

No

15

Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness.

akka.log.lifecycle.events

No

false

Switch of Akka remote time logging, which can be enabled for debugging.

akka.startup-timeout

No

The default value is the same as the value of akka.ask.timeout.

Timeout duration of remote component started by Akka. The unit is ms/s/m/h/d.

akka.ssl.enabled

Yes

true

Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled.

For configuration items for MRS 3.x or later, see Table 2.

Table 2 Parameters

Parameter

Description

Default Value

Mandatory

akka.ask.timeout

Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d.

10s

No

akka.lookup.timeout

Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d.

10s

No

akka.framesize

Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB.

10485760b

No

akka.watch.heartbeat.interval

Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d.

NOTE:

For detailed explanation of DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

10s

No

akka.watch.heartbeat.pause

Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d.

NOTE:

For detailed explanation of DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

60s

No

akka.watch.threshold

DeathWatch failure detection threshold. A small value may mark normal TaskManager as failed and a large value increases failure detection time.

NOTE:

For detailed explanation of DeathWatch, see the Akka official website: http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

12

No

akka.tcp.timeout

Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d.

20s

No

akka.throughput

Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness.

15

No

akka.log.lifecycle.events

Switch of Akka remote time logging, which can be enabled for debugging.

false

No

akka.startup-timeout

Timeout interval before a remote component fails to be started. The value must contain a time unit (ms/s/min/h/d).

The default value is the same as the value of akka.ask.timeout.

No

akka.ssl.enabled

Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled.

true

Yes

akka.client-socket-worker-pool.pool-size-factor

Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values.

1.0

No

akka.client-socket-worker-pool.pool-size-max

Maximum number of threads calculated based on the factor.

2

No

akka.client-socket-worker-pool.pool-size-min

Minimum number of threads calculated based on the factor.

1

No

akka.client.timeout

Timeout duration of the client. The value must contain a time unit (ms/s/min/h/d).

60s

No

akka.server-socket-worker-pool.pool-size-factor

Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values.

1.0

No

akka.server-socket-worker-pool.pool-size-max

Maximum number of threads calculated based on the factor.

2

No

akka.server-socket-worker-pool.pool-size-min

Minimum number of threads calculated based on the factor.

1

No