Updated on 2023-11-03 GMT+08:00

Configuring Flink Service Parameters

Description

All Flink parameters are configurable on the client. You are advised to modify the flink-conf.yaml configuration file on the client. If Flink parameters are modified on FusionInsight Manager, you need to download and install the client again after the configuration is complete.

  • Configuration file path: Client installation path/Flink/flink/conf/flink-conf.yaml
  • File configuration format: Key:Value

    Example: taskmanager.heap.size: 1024mb

    A space is required between Key: and Value.

Configurations

The following describes the parameters:

  • JobManager & TaskManager:

    JobManager and TaskManager are main components of Flink, which is used for different security and performance scenarios. The configuration items include communication ports, memory management, and connection retry.

  • Blob server:

    The Blob server on the JobManager node is used to receive JAR files uploaded by users on the client, send JAR files to TaskManager, and transfer log files. The configuration items include ports, SSL, retries, and concurrency.

  • Distributed Coordination (via Akka):

    The Akka actor model is the basis of communications between a Flink client and JobManager, JobManager and TaskManager, as well as TaskManagers. Related parameters can be configured based on the network environment or optimization policy. The configuration items include timeout settings for message sending and waiting and the Akka DeathWatch detection meachanism.

  • SSL:

    To configure a secure Flink cluster, you need to configure SSL-related configuration items, including SSL, certificate, password, and encryption algorithm.

  • Network communication (via Netty):

    When Flink runs a job, data transmission and reverse pressure detection between tasks depend on Netty. In certain environments, Netty parameters should be configured. For advanced optimization, you can adjust some Netty configuration items. The default configuration can meet the requirements of concurrent and high-throughput tasks in a large-scale cluster.

  • JobManager Web Frontend:

    When JobManager is started, the web server is started in the same process. You can access the web server to obtain information about the current Flink cluster, including JobManager, TaskManager, and jobs running in the cluster. Configuration items of the web server include the port, temporary directory, display items, error redirection, and security-related items.

  • File Systems:

    When a task is running, a result file is created. You can configure the file creation behavior, including the file overwriting strategy and directory creation.

  • State Backend:

    Flink enables HA and job exception, as well as job pause and recovery during version upgrade. Flink depends on state backend to store job states and on the restart strategy to restart a job. You can configure state backend and the restart strategy. Configuration items include the state backend type, storage path, and restart strategy.

  • Kerberos-based Security:

    Kerberos-related configuration items must be configured in Flink's security mode. The configuration items include keytab and principal of Kerberos.

  • HA:

    The HA mode of Flink depends on ZooKeeper. So, ZooKeeper configuration items must be configured, including the ZooKeeper address, path, and security authentication.

  • Environment:

    In scenarios raising special requirements on JVM configuration, users can use configuration items to transfer JVM parameters to the client, JobManager, and TaskManager.

  • Yarn:

    Flink runs on a Yarn cluster and JobManager runs on ApplicationMaster. Some configuration parameters of JobManager depend on YARN. You can configure YARN-related configurations to enable Flink to better run on YARN. The configuration items include the memory, virtual kernel, and port of YARN containers.

  • Pipeline:

    The Netty connection is used among multiple jobs to reduce latency. In this case, NettySink is used on the server and NettySource is used on the client for data transmission. Configuration items include NettySink information storing path, range of NettySink monitoring port, whether to enable SSL encryption, domain of the network used for NettySink monitoring.

JobManager & TaskManager

Table 1 JobManager & TaskManager parameters

Parameter

Description

Default Value

Mandatory

Remarks

taskmanager.memory.size

Amount of heap memory of the Java virtual machine (JVM) that TaskManager reserves for sorting, hash tables, and caching of intermediate results. If unspecified, the memory manager will take a fixed ratio with respect to the size of JVM as specified by taskmanager.memory.fraction. The unit is MB.

0

No

Only versions earlier than MRS 3.x

taskmanager.registration.initial-backoff

Initial interval between two consecutive registration attempts. The unit is ms/s/m/h/d.

The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively.

500 ms

No

taskmanager.registration.refused-backoff

Retry interval when a registration connection is rejected by JobManager.

5 min

No

taskmanager.rpc.port

IPC port range of TaskManager

32326-32390

No

Applicable to all versions

taskmanager.memory.segment-size

Size of the memory buffer used by the memory manager and network stack The unit is bytes.

32768

No

taskmanager.data.port

Data exchange port range of TaskManager

32391-32455

No

taskmanager.data.ssl.enabled

Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled.

false

No

taskmanager.numberOfTaskSlots

Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter.

1

No

parallelism.default

Default degree of parallelism, which is used for jobs for which the degree of parallelism is not specified

1

No

taskmanager.memory.fraction

Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate status.

0.7

No

taskmanager.memory.off-heap

Whether TaskManager uses off-heap memory for sorting, hash tables, and caching of intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency.

false

Yes

taskmanager.memory.preallocate

Whether TaskManager allocates reserved memory upon startup. You are advised to enable this item when off-heap memory is used.

false

No

task.cancellation.interval

Interval between two successive task cancellation attempts. The unit is millisecond.

30000

No

client.rpc.port

Akka system monitoring port on the Flink client.

32651-32720

No

Only MRS 3.x or later

jobmanager.heap.size

Size of the heap memory of JobManager. In yarn-session mode, the value can be transmitted by only the -jm parameter. In yarn-cluster mode, the value can be transmitted by only the -yjm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB.

1024mb

No

taskmanager.heap.size

Size of the heap memory of TaskManager. In yarn-session mode, the value can be transmitted by only the -tm parameter. In yarn-cluster mode, the value can be transmitted by only the -ytm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB.

1024mb

No

taskmanager.network.numberOfBuffers

Number of TaskManager network transmission buffer stacks. If an error indicates insufficient system buffer, increase the parameter value.

2048

No

taskmanager.debug.memory.startLogThread

Enable this item for debugging Flink memory and garbage collection (GC)-related problems. TaskManager periodically collects memory and GC statistics, including the current utilization of heap and off-heap memory pools and GC time.

false

No

taskmanager.debug.memory.logIntervalMs

Interval at which TaskManager periodically collects memory and GC statistics.

0

No

taskmanager.maxRegistrationDuration

Maximum duration of TaskManager registration on JobManager. If the actual duration exceeds the value, TaskManager is disabled.

5 min

No

taskmanager.initial-registration-pause

Initial interval between two consecutive registration attempts. The value must contain a time unit (ms/s/min/h/d), for example, 5 seconds.

The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively.

500 ms

No

taskmanager.max-registration-pause

Maximum registration retry interval in case of TaskManager registration failures. The unit is ms/s/m/h/d.

30 s

No

taskmanager.refused-registration-pause

Retry interval when a TaskManager registration connection is rejected by JobManager. The unit is ms/s/m/h/d.

10 s

No

classloader.resolve-order

Class resolution policies defined when classes are loaded from user codes, which means whether to first check the user code JAR file (child-first) or the application class path (parent-first). The default setting indicates that the class is first loaded from the user code JAR file, which means that the user code JAR file can contain and load dependencies that are different from those used by Flink.

child-first

No

slot.idle.timeout

Timeout for an idle slot in Slot Pool, in milliseconds.

50000

No

slot.request.timeout

Timeout for requesting a slot from Slot Pool, in milliseconds.

300000

No

task.cancellation.timeout

Timeout of task cancellation, in milliseconds. If a task cancellation times out, a fatal TaskManager error may occur. If this parameter is set to 0, no error is reported when a task cancellation times out.

180000

No

taskmanager.network.detailed-metrics

Indicates whether to enable the detailed metrics monitoring of network queue lengths.

false

No

taskmanager.network.memory.buffers-per-channel

Maximum number of network buffers used by each output/input channel (sub-partition/incoming channel). In credit-based flow control mode, this indicates how much credit is in each input channel. It should be configured with at least 2 buffers to deliver good performance. One buffer is used to receive in-flight data in the sub-partition, and the other for parallel serialization.

2

No

taskmanager.network.memory.floating-buffers-per-gate

Number of extra network buffers used by each output gate (result partition) or input gate, indicating the amount of floating credit shared among all input channels in credit-based flow control mode. Floating buffers are distributed based on the backlog feedback (real-time output buffers in sub-partitions) and can help mitigate back pressure caused by unbalanced data distribution among sub-partitions. Increase this value if the round-trip time between nodes is long and/or the number of machines in the cluster is large.

8

No

taskmanager.network.memory.fraction

Ratio of JVM memory used for network buffers, which determines how many streaming data exchange channels a TaskManager can have at the same time and the extent of channel buffering. Increase this value or the values of taskmanager.network.memory.min and taskmanager.network.memory.max if the job is rejected or a warning indicating that the system does not have enough buffers is received. Note that the values of taskmanager.network.memory.min and taskmanager.network.memory.max may overwrite this value.

0.1

No

taskmanager.network.memory.max

Maximum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB).

1 GB

No

taskmanager.network.memory.min

Minimum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB).

64 MB

No

taskmanager.network.request-backoff.initial

Minimum backoff (in milliseconds) for partition requests of input channels.

100

No

taskmanager.network.request-backoff.max

Maximum backoff (in milliseconds) for partition requests of input channels.

10000

No

taskmanager.registration.timeout

Timeout for TaskManager registration. TaskManager will be terminated if it is not successfully registered within the specified time. The value must contain a time unit (ms/s/min/h/d).

5 min

No

resourcemanager.taskmanager-timeout

Timeout interval for releasing an idle TaskManager, in milliseconds.

30000

No

Blob server

Table 2 Blog server parameters

Parameter

Description

Default Value

Mandatory

blob.server.port

Blob server port

32456-32520

No

blob.service.ssl.enabled

Indicates whether to enable the encryption for the blob transmission channel. This parameter is valid only when the global switch security.ssl is enabled.

true

Yes

blob.fetch.retries

Number of times that TaskManager tries to download blob files from JobManager.

50

No

blob.fetch.num-concurrent

Number of concurrent tasks for downloading blob files supported by JobManager.

50

No

blob.fetch.backlog

Number of blob files, such as .jar files, to be downloaded in the queue supported by JobManager. The unit is count.

1000

No

library-cache-manager.cleanup.interval

Interval at which JobManager deletes the JAR files stored on the HDFS when the user cancels the Flink job. The unit is second.

This parameter is available only to MRS 3.x or later.

3600

No

Distributed Coordination (via Akka)

Table 3 Distributed Coordination parameters

Parameter

Description

Default Value

Mandatory

Remarks

akka.ask.timeout

Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d.

10s

No

Applicable to all versions

akka.lookup.timeout

Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d.

10s

No

akka.framesize

Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB.

10485760b

No

akka.watch.heartbeat.interval

Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d.

10s

No

akka.watch.heartbeat.pause

Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d.

60s

No

akka.watch.threshold

DeathWatch failure detection threshold. A small value may mark normal TaskManager as failed and a large value increases failure detection time.

12

No

akka.tcp.timeout

Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d.

20s

No

akka.throughput

Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness.

15

No

akka.log.lifecycle.events

Switch of Akka remote time logging, which can be enabled for debugging.

false

No

akka.startup-timeout

Timeout interval before a remote component fails to be started. The value must contain a time unit (ms/s/min/h/d).

The value is the same as the value of akka.ask.timeout.

No

akka.ssl.enabled

Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled.

true

Yes

akka.client-socket-worker-pool.pool-size-factor

Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values.

1.0

No

Applicable only to MRS 3.x or later

akka.client-socket-worker-pool.pool-size-max

Maximum number of threads calculated based on the factor.

2

No

akka.client-socket-worker-pool.pool-size-min

Minimum number of threads calculated based on the factor.

1

No

akka.client.timeout

Timeout duration of the client. The value must contain a time unit (ms/s/min/h/d).

60s

No

akka.server-socket-worker-pool.pool-size-factor

Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values.

1.0

No

akka.server-socket-worker-pool.pool-size-max

Maximum number of threads calculated based on the factor.

2

No

akka.server-socket-worker-pool.pool-size-min

Minimum number of threads calculated based on the factor.

1

No

SSL

Table 4 SSL parameters

Parameter

Description

Default Value

Mandatory

Remarks

security.ssl.internal.enabled

Whether to enable SSL for internal communication. This parameter is automatically configured based on the security mode of the cluster.

  • Security mode: true
  • Normal mode: false

Yes

Applicable only to versions earlier than MRS 3.x

security.ssl.internal.keystore

Java keystore file.

-

Yes

security.ssl.internal.keystore-password

Password used to decrypt the keystore file.

-

Yes

security.ssl.internal.key-password

Password used to decrypt the server key in the keystore file.

-

Yes

security.ssl.internal.truststore

truststore file containing the public CA certificates.

-

Yes

security.ssl.internal.truststore-password

Password used to decrypt the truststore file.

-

Yes

security.ssl.rest.enabled

Whether to enable SSL for external communication. This parameter is automatically configured based on the security mode of the cluster.

  • Security mode: true
  • Normal mode: false

Yes

security.ssl.rest.keystore

Java keystore file.

-

Yes

security.ssl.rest.keystore-password

Password used to decrypt the keystore file.

-

Yes

security.ssl.rest.key-password

Password used to decrypt the server key in the keystore file.

-

Yes

security.ssl.rest.truststore

truststore file containing the public CA certificates.

-

Yes

security.ssl.rest.truststore-password

Password used to decrypt the truststore file.

-

Yes

security.ssl.protocol

SSL transmission protocol version.

TLSv1.2

Yes

Applicable to all versions

security.ssl.algorithms

Supported SSL standard algorithm. For details, visit the Java official website at: http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites.

TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

Yes

security.ssl.enabled

Whether to enable SSL for internal communication. This parameter is automatically configured based on the cluster installation mode.

  • Security mode: true
  • Normal mode: false

Yes

Applicable only to MRS 3.x or later

security.ssl.keystore

Java keystore file.

-

Yes

security.ssl.keystore-password

Password used to decrypt the keystore file.

-

Yes

security.ssl.key-password

Password used to decrypt the server key in the keystore file.

-

Yes

security.ssl.truststore

truststore file containing the public CA certificates.

-

Yes

security.ssl.truststore-password

Password used to decrypt the truststore file.

-

Yes

Network communication (via Netty)

Table 5 Network communication parameters

Parameter

Description

Default Value

Mandatory

taskmanager.network.netty.num-arenas

Number of Netty memory blocks.

1

No

taskmanager.network.netty.server.numThreads

Number of Netty server threads

1

No

taskmanager.network.netty.client.numThreads

Number of Netty client threads

1

No

taskmanager.network.netty.client.connectTimeoutSec

Netty client connection timeout duration. The unit is second.

120

No

taskmanager.network.netty.sendReceiveBufferSize

Size of Netty sending and receiving buffers. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MB in modern Linux. The unit is bytes.

4096

No

taskmanager.network.netty.transport

Netty transport type, either nio or epoll

nio

No

JobManager Web Frontend

Table 6 JobManager Web Frontend parameters

Parameter

Description

Default Value

Mandatory

Remarks

jobmanager.web.port

The web port. Value range: 32261–32325.

32261-32325

No

Applicable only to versions earlier than MRS 3.x

jobmanager.web.allow-access-address

Web access whitelist. IP addresses are separated by commas (,). Only whitelisted IP addresses can access the web.

*

Yes

Applicable to all versions

flink.security.enable

When installing a Flink cluster, you are required to select security mode or normal mode.

  • If security mode is selected, this parameter is automatically set to true.
  • If normal mode is selected, this parameter is automatically set to false.

For an installed Flink cluster, you can view the configured value to determine whether the cluster is in security or normal mode.

Automatic configuration

No

Applicable only to MRS 3.x or later

rest.bind-port

The web port. Value range: 32261–32325.

32261-32325

No

jobmanager.web.history

Number of recent jobs to be displayed.

5

No

jobmanager.web.checkpoints.disable

Whether to disable checkpoint statistics.

false

No

jobmanager.web.checkpoints.history

Number of checkpoint statistical records.

10

No

jobmanager.web.backpressure.cleanup-interval

Interval for clearing unaccessed backpressure records. The unit is millisecond.

600000

No

jobmanager.web.backpressure.refresh-interval

Interval for updating backpressure records. The unit is millisecond.

60000

No

jobmanager.web.backpressure.num-samples

Number of stack tracing records for reverse pressure calculation.

100

No

jobmanager.web.backpressure.delay-between-samples

Sampling interval for reverse pressure calculation. The unit is millisecond.

50

No

jobmanager.web.ssl.enabled

Whether SSL encryption is enabled for web transmission. This parameter is valid only when the global switch security.ssl is enabled.

false

Yes

jobmanager.web.accesslog.enable

Switch to enable or disable web operation logs. The log is stored in webaccess.log.

true

Yes

jobmanager.web.x-frame-options

Value of the HTTP security header X-Frame-Options. The value can be SAMEORIGIN, DENY, or ALLOW-FROM uri.

DENY

Yes

jobmanager.web.cache-directive

Whether the web page can be cached.

no-store: No content is saved to the cache.

Yes

jobmanager.web.expires-time

Expiration duration of web page cache. The unit is millisecond.

0

Yes

jobmanager.web.access-control-allow-origin

Web page same-origin policy that prevents cross-domain attacks. * Indicates that any website can access the service port across domains. You can set this parameter to a specified website.

* (non-security cluster)

Yes

jobmanager.web.refresh-interval

Web page refresh interval. The unit is millisecond.

3000

Yes

jobmanager.web.logout-timer

Automatic logout interval when no operation is performed. The unit is millisecond.

600000

Yes

jobmanager.web.403-redirect-url

Web page access error 403. If 403 error occurs, the page switch to a specified page.

Automatic configuration

Yes

jobmanager.web.404-redirect-url

Web page access error 404. If 404 error occurs, the page switch to a specified page.

Automatic configuration

Yes

jobmanager.web.415-redirect-url

Web page access error 415. If 415 error occurs, the page switch to a specified page.

Automatic configuration

Yes

jobmanager.web.500-redirect-url

Web page access error 500. If 500 error occurs, the page switch to a specified page.

Automatic configuration

Yes

rest.await-leader-timeout

Time of the client waiting for the leader address. The unit is millisecond.

30000

No

rest.client.max-content-length

Maximum content length that the client handles (unit: bytes).

104857600

No

rest.connection-timeout

Maximum time for the client to establish a TCP connection (unit: ms).

15000

No

rest.idleness-timeout

Maximum time for a connection to stay idle before failing (unit: ms).

300000

No

rest.retry.delay

The time that the client waits between retries (unit: ms).

3000

No

rest.retry.max-attempts

The number of retry times if a retrievable operator fails.

20

No

rest.server.max-content-length

Maximum content length that the server handles (unit: bytes).

104857600

No

rest.server.numThreads

Maximum number of threads for the asynchronous processing of requests.

4

No

web.timeout

Timeout for web monitor (unit: ms).

10000

No

File Systems

Table 7 File Systems parameters

Parameter

Description

Default Value

Mandatory

fs.overwrite-files

Whether to overwrite the existing file by default when the file is written.

false

No

fs.output.always-create-directory

When the degree of parallelism (DOP) of file writing programs is greater than 1, a directory is created under the output file path and different result files (each parallel write program) are stored in the directory.

  • If this parameter is set to true, a directory is created for the writing program whose DOP is 1 and a result file is stored in the directory.
  • If this parameter is set to false, the file of the writing program whose DOP is 1 is created directly in the output path and no directory is created.

false

No

State Backend

Table 8 State Backend parameters

Parameter

Description

Default Value

Mandatory

state.backend.fs.checkpointdir

Path when the backend is set to filesystem. The path must be accessible by JobManager. Only the local mode is supported. In the cluster mode, use an HDFS path.

hdfs:///flink/checkpoints

No

state.savepoints.dir

Savepoint storage directory used by Flink to restore and update jobs. When a savepoint is triggered, the metadata of the savepoint is saved to this directory.

hdfs:///flink/savepoint

Mandatory in security mode

restart-strategy

Default restart policy, which is used for jobs for which no restart policy is specified.

  • fixed-delay
  • failure-rate
  • none

none

No

restart-strategy.fixed-delay.attempts

The retry times of the fixed-delay policy.

  • If the checkpoint is enabled, the default value is the value of Integer.MAX_VALUE.
  • If the checkpoint is disabled, the default value is 3.

No

restart-strategy.fixed-delay.delay

Retry interval when the fixed-delay strategy is used. The unit is ms/s/m/h/d.

  • If the checkpoint is enabled, the default value is 10s.
  • If the checkpoint is disabled, the default value is the value of akka.ask.timeout.

No

restart-strategy.failure-rate.max-failures-per-interval

Maximum number of restart times in a specified period before a job fails when the fault rate policy is used.

1

No

restart-strategy.failure-rate.failure-rate-interval

Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d.

60 s

No

restart-strategy.failure-rate.delay

Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d.

The default value is the same as the value of akka.ask.timeout. For details, see Distributed Coordination (via Akka).

No

Kerberos-based Security

Table 9 Kerberos-based security parameters

Parameter

Description

Default Value

Mandatory

security.kerberos.login.keytab

Keytab file path. This parameter is a client parameter.

Configure the parameter based on actual service requirements.

Yes

security.kerberos.login.principal

A parameter on the client. If security.kerberos.login.keytab and security.kerberos.login.principal are both set, keytab certificate is used by default.

Configure the parameter based on actual service requirements.

No

security.kerberos.login.contexts

Contexts of the jass file generated by Flink. This parameter is a server parameter.

Client, KafkaClient

Yes

HA

Table 10 HA parameters

Parameter

Description

Default Value

Mandatory

high-availability

Whether HA is enabled. Only the following two modes are supported currently:

  • none: Only a single JobManager is running. The checkpoint is disabled for JobManager.
  • ZooKeeper:
    • In non-Yarn mode, multiple JobManagers are supported and the leader JobManager is elected.
    • In Yarn mode, only one JobManager exists.

zookeeper

No

high-availability.zookeeper.quorum

ZooKeeper quorum address.

Automatic configuration

No

high-availability.zookeeper.path.root

Root directory that Flink creates on ZooKeeper, storing metadata required in HA mode.

/flink

No

high-availability.storageDir

Directory for storing JobManager metadata of state backend. ZooKeeper stores only pointers to actual data.

hdfs:///flink/recovery

No

high-availability.zookeeper.client.session-timeout

Session timeout duration on the ZooKeeper client. The unit is millisecond.

60000

No

high-availability.zookeeper.client.connection-timeout

Connection timeout duration on the ZooKeeper client. The unit is millisecond.

15000

No

high-availability.zookeeper.client.retry-wait

Retry waiting time on the ZooKeeper client. The unit is millisecond.

5000

No

high-availability.zookeeper.client.max-retry-attempts

Maximum retry times on the ZooKeeper client.

3

No

high-availability.job.delay

Delay of job restart when JobManager recovers.

This parameter is available only to MRS 3.x or later.

The default value is the same as the value of akka.ask.timeout.

No

high-availability.zookeeper.client.acl

Configure the ACL (open creator) of the ZooKeeper node. The ACL is automatically configured based on the security mode of the cluster. For ACL options, see https://zookeeper.apache.org/doc/r3.5.1-alpha/zookeeperProgrammers.html#sc_BuiltinACLSchemes.

  • Security mode: The default value is creator.
  • Non-security mode: The default value is open.

Yes

zookeeper.sasl.disable

Whether to enable SASL authentication. This parameter is automatically configured based on the security mode of the cluster.

  • Security mode: false
  • Non-security mode: true

Yes

zookeeper.sasl.service-name

  • If the ZooKeeper server configures a service whose name is different from ZooKeeper, this configuration item can be set.
  • If service names on the client and server are inconsistent, authentication fails.

zookeeper

Yes

Environment

Table 11 Environment parameters

Parameter

Description

Default Value

Mandatory

env.java.opts

JVM parameter, which is transferred to the startup script, JobManager, TaskManager, and Yarn client. For example, transfer remote debugging parameters.

-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M -Djdk.tls.ephemeralDHKeySize=2048 -Djava.library.path=${HADOOP_COMMON_HOME}/lib/native -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false -Dbeetle.application.home.path=/opt/xxx/Bigdata/common/runtime/security/config

No

Yarn

Table 12 YARN parameters

Parameter

Description

Default Value

Mandatory

yarn.maximum-failed-containers

Maximum number of containers the system is going to reallocate in case of a container failure of TaskManager The default value is the number of TaskManagers when the Flink cluster is started.

5

No

yarn.application-attempts

Number of ApplicationMaster restarts. The value is the maximum value in the validity interval that is set to Akka's timeout in Flink. After the restart, the IP address and port number of ApplicationMaster will change and you will need to connect to the client manually.

2

No

yarn.heartbeat-delay

Time between heartbeats with the ApplicationMaster and Yarn ResourceManager in seconds. The unit is second.

5

No

yarn.containers.vcores

Number of virtual cores of each Yarn container

Number of TaskManager slots

No

yarn.application-master.port

ApplicationMaster port number setting. A port number range is supported.

32586-32650

No

Pipeline

It applies to MRS 3.x or later.

Table 13 Pipeline parameters

Parameter

Description

Default Value

Mandatory

nettyconnector.registerserver.topic.storage

Path (on a third-party server) to information about IP address, port numbers, and concurrency of NettySink. ZooKeeper is recommended for storage.

/flink/nettyconnector

No. However, if pipeline is enabled, the feature is mandatory.

nettyconnector.sinkserver.port.range

Port range of NettySink.

28444-28843

No. However, if pipeline is enabled, the feature is mandatory.

nettyconnector.ssl.enabled

Whether SSL encryption for the communication between NettySink and NettySource is enabled. For details about the encryption key and protocol, see SSL.

false

No. However, if pipeline is enabled, the feature is mandatory.

nettyconnector.message.delimiter

Delimiter used to configure the message sent by NettySink to the NettySource, which is 2-4 bytes long, and cannot contain \n, #, or space.

The default value is $_.

No. However, if pipeline is enabled, the feature is mandatory.