Updated on 2022-08-12 GMT+08:00

JobManager & TaskManager

Scenarios

JobManager and TaskManager are main components of Flink. You can configure the parameters for different security and performance scenarios on the client.

Configuration Description

Main configuration items include communication port, memory management, connection retry, and so on.

For versions earlier than MRS 3.x, see Table 1.

Table 1 Parameters

Parameter

Mandatory

Default Value

Description

taskmanager.rpc.port

No

32326-32390

IPC port range of TaskManager

taskmanager.data.port

No

32391-32455

Data exchange port range of TaskManager

taskmanager.data.ssl.enabled

No

false

Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled.

taskmanager.numberOfTaskSlots

No

3

Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter.

parallelism.default

No

1

Number of concurrent job operators.

taskmanager.memory.size

No

0

Amount of heap memory of the Java virtual machine (JVM) that TaskManager reserves for sorting, hash tables, and caching of intermediate results. If unspecified, the memory manager will take a fixed ratio with respect to the size of JVM as specified by taskmanager.memory.fraction. The unit is MB.

taskmanager.memory.fraction

No

0.7

Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate results.

taskmanager.memory.off-heap

Yes

false

Whether TaskManager uses off-heap memory for sorting, hash tables and intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency.

taskmanager.memory.segment-size

No

32768

Size of memory segment on TaskManager. Memory segment is the basic unit of the reserved memory space and is used to configure network buffer stacks. The unit is bytes.

taskmanager.memory.preallocate

No

false

Whether TaskManager allocates reserved memory space upon startup. You are advised to enable this item when off-heap memory is used.

taskmanager.registration.initial-backoff

No

500 ms

Initial interval between two consecutive registration attempts. The unit is ms/s/m/h/d.

NOTE:

The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively.

taskmanager.registration.refused-backoff

No

5 min

Retry interval when a registration connection is rejected by JobManager.

task.cancellation.interval

No

30000

Interval between two successive task cancellation attempts.

For configuration items for MRS 3.x or later, see Table 2.

Table 2 Parameters

Parameter

Description

Default Value

Mandatory

taskmanager.rpc.port

IPC port range of TaskManager

32326-32390

No

client.rpc.port

Akka system listening port on the Flink client.

32651-32720

No

taskmanager.data.port

Data exchange port range of TaskManager

32391-32455

No

taskmanager.data.ssl.enabled

Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled.

false

No

jobmanager.heap.size

Size of the heap memory of JobManager. In yarn-session mode, the value can be transmitted by only the -jm parameter. In yarn-cluster mode, the value can be transmitted by only the -yjm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. Unit: B/KB/MB/GB/TB.

1024mb

No

taskmanager.heap.size

Size of the heap memory of TaskManager. In yarn-session mode, the value can be transmitted by only the -tm parameter. In yarn-cluster mode, the value can be transmitted by only the -ytm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB.

1024mb

No

taskmanager.numberOfTaskSlots

Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter.

1

No

parallelism.default

Default degree of parallelism, which is used for jobs for which the degree of parallelism is not specified

1

No

taskmanager.network.numberOfBuffers

Number of TaskManager network transmission buffer stacks. If an error indicates insufficient system buffer, increase the parameter value.

2048

No

taskmanager.memory.fraction

Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate results.

0.7

No

taskmanager.memory.off-heap

Whether TaskManager uses off-heap memory for sorting, hash tables and intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency.

false

Yes

taskmanager.memory.segment-size

Size of the memory buffer used by the memory manager and network stack The unit is bytes.

32768

No

taskmanager.memory.preallocate

Whether TaskManager allocates reserved memory space upon startup. You are advised to enable this item when off-heap memory is used.

false

No

taskmanager.debug.memory.startLogThread

Enable this item for debugging Flink memory and garbage collection (GC)-related problems. TaskManager periodically collects memory and GC statistics, including the current utilization of heap and off-heap memory pools and GC time.

false

No

taskmanager.debug.memory.logIntervalMs

Interval at which TaskManager periodically collects memory and GC statistics.

0

No

taskmanager.maxRegistrationDuration

Maximum duration of TaskManager registration on JobManager. If the actual duration exceeds the value, TaskManager is disabled.

5 min

No

taskmanager.initial-registration-pause

Initial interval between two consecutive registration attempts. The value must contain a time unit (ms/s/min/h/d), for example, 5 seconds.

500ms

NOTE:

The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively.

No

taskmanager.max-registration-pause

Maximum registration retry interval in case of TaskManager registration failures. The unit is ms/s/m/h/d.

30s

No

taskmanager.refused-registration-pause

Retry interval when a TaskManager registration connection is rejected by JobManager. The unit is ms/s/m/h/d.

10s

No

task.cancellation.interval

Interval between two successive task cancellation attempts. The unit is millisecond.

30000

No

classloader.resolve-order

Class resolution policies defined when classes are loaded from user codes, which means whether to first check the user code JAR file (child-first) or the application class path (parent-first). The default setting indicates that the class is first loaded from the user code JAR file, which means that the user code JAR file can contain and load dependencies that are different from those used by Flink.

child-first

No

slot.idle.timeout

Timeout for an idle slot in Slot Pool, in milliseconds.

50000

No

slot.request.timeout

Timeout for requesting a slot from Slot Pool, in milliseconds.

300000

No

task.cancellation.timeout

Timeout of task cancellation, in milliseconds. If a task cancellation times out, a fatal TaskManager error may occur. If this parameter is set to 0, no error is reported when a task cancellation times out.

180000

No

taskmanager.network.detailed-metrics

Indicates whether to enable the detailed metrics monitoring of network queue lengths.

false

No

taskmanager.network.memory.buffers-per-channel

Maximum number of network buffers used by each output/input channel (sub-partition/incoming channel). In credit-based flow control mode, this indicates how much credit is in each input channel. It should be configured with at least 2 buffers to deliver good performance. One buffer is used to receive in-flight data in the sub-partition, and the other for parallel serialization.

2

No

taskmanager.network.memory.floating-buffers-per-gate

Number of extra network buffers used by each output gate (result partition) or input gate, indicating the amount of floating credit shared among all input channels in credit-based flow control mode. Floating buffers are distributed based on the backlog feedback (real-time output buffers in sub-partitions) and can help mitigate back pressure caused by unbalanced data distribution among sub-partitions. Increase this value if the round-trip time between nodes is long and/or the number of machines in the cluster is large.

8

No

taskmanager.network.memory.fraction

Ratio of JVM memory used for network buffers, which determines how many streaming data exchange channels a TaskManager can have at the same time and the extent of channel buffering. Increase this value or the values of taskmanager.network.memory.min and taskmanager.network.memory.max if the job is rejected or a warning indicating that the system does not have enough buffers is received. Note that the values of taskmanager.network.memory.min and taskmanager.network.memory.max may overwrite this value.

0.1

No

taskmanager.network.memory.max

Maximum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB).

1 GB

No

taskmanager.network.memory.min

Minimum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB).

64 MB

No

taskmanager.network.request-backoff.initial

Minimum backoff for partition requests of input channels.

100

No

taskmanager.network.request-backoff.max

Maximum backoff for partition requests of input channels.

10000

No

taskmanager.registration.timeout

Timeout for TaskManager registration. TaskManager will be terminated if it is not successfully registered within the specified time. The value must contain a time unit (ms/s/min/h/d).

5 min

No

resourcemanager.taskmanager-timeout

Timeout interval for releasing an idle TaskManager, in milliseconds.

30000

No