Configuring Flink Service Parameters
Description
All Flink parameters are configurable on the client. You are advised to modify the flink-conf.yaml configuration file on the client. If Flink parameters are modified on FusionInsight Manager, you need to download and install the client again after the configuration is complete.
- Configuration file path: Client installation path/Flink/flink/conf/flink-conf.yaml
- File configuration format: Key:Value
Example: taskmanager.heap.size: 1024mb
A space is required between Key: and Value.
Configurations
The following describes the parameters:
- JobManager & TaskManager:
JobManager and TaskManager are main components of Flink, which is used for different security and performance scenarios. The configuration items include communication ports, memory management, and connection retry.
- Blob server:
The Blob server on the JobManager node is used to receive JAR files uploaded by users on the client, send JAR files to TaskManager, and transfer log files. The configuration items include ports, SSL, retries, and concurrency.
- Distributed Coordination (via Akka):
The Akka actor model is the basis of communications between a Flink client and JobManager, JobManager and TaskManager, as well as TaskManagers. Related parameters can be configured based on the network environment or optimization policy. The configuration items include the timeout settings for message sending and waiting and the Akka listening mechanism DeathWatch.
- SSL:
To configure a secure Flink cluster, you need to configure SSL-related configuration items, including SSL, certificate, password, and encryption algorithm.
- Network communication (via Netty):
When Flink runs a job, data transmission and reverse pressure detection between tasks depend on Netty. In certain environments, Netty parameters should be configured. For advanced optimization, you can adjust some Netty configuration items. The default configuration can meet the requirements of concurrent and high-throughput tasks in a large-scale cluster.
- JobManager Web Frontend:
When JobManager is started, the web server is started in the same process. You can access the web server to obtain information about the current Flink cluster, including JobManager, TaskManager, and jobs running in the cluster. Configuration items of the web server include the port, temporary directory, display items, error redirection, and security-related items.
- File Systems:
When a task is running, a result file is created. You can configure the file creation behavior, including the file overwriting strategy and directory creation.
- State Backend:
Flink enables HA and job exception, as well as job pause and recovery during version upgrade. Flink depends on state backend to store job states and on the restart strategy to restart a job. You can configure state backend and the restart strategy. Configuration items include the state backend type, storage path, and restart strategy.
- Kerberos-based Security:
Kerberos-related configuration items must be configured in Flink's security mode. The configuration items include keytab and principal of Kerberos.
- HA:
The HA mode of Flink depends on ZooKeeper. So, ZooKeeper configuration items must be configured, including the ZooKeeper address, path, and security authentication.
- Environment:
In scenarios raising special requirements on JVM configuration, users can use configuration items to transfer JVM parameters to the client, JobManager, and TaskManager.
- Yarn:
Flink runs on a Yarn cluster and JobManager runs on ApplicationMaster. Some configuration parameters of JobManager depend on YARN. You can configure YARN-related configurations to enable Flink to better run on YARN. The configuration items include the memory, virtual kernel, and port of YARN containers.
- Pipeline:
The Netty connection is used among multiple jobs to reduce latency. In this case, NettySink is used on the server and NettySource is used on the client for data transmission. Configuration items include NettySink information storing path, range of NettySink listening port, whether to enable SSL encryption, domain of the network used for NettySink monitoring.
JobManager & TaskManager
Parameter |
Description |
Default Value |
Mandatory |
Remarks |
---|---|---|---|---|
taskmanager.memory.size |
Amount of heap memory of the Java virtual machine (JVM) that TaskManager reserves for sorting, hash tables, and caching of intermediate results. If unspecified, the memory manager will take a fixed ratio with respect to the size of JVM as specified by taskmanager.memory.fraction. The unit is MB. |
0 |
No |
Only versions earlier than MRS 3.x |
taskmanager.registration.initial-backoff |
Initial interval between two consecutive registration attempts. The unit is ms/s/m/h/d. The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively. |
500 ms |
No |
|
taskmanager.registration.refused-backoff |
Retry interval when a registration connection is rejected by JobManager. |
5 min |
No |
|
taskmanager.rpc.port |
IPC port range of TaskManager |
32326-32390 |
No |
Applicable to all versions |
taskmanager.memory.segment-size |
Size of the memory buffer used by the memory manager and network stack The unit is bytes. |
32768 |
No |
|
taskmanager.data.port |
Data exchange port range of TaskManager |
32391-32455 |
No |
|
taskmanager.data.ssl.enabled |
Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled. |
false |
No |
|
taskmanager.memory.off-heap |
Whether TaskManager uses off-heap memory for sorting, hash tables and intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency. |
false |
Yes |
|
taskmanager.memory.preallocate |
Whether TaskManager allocates reserved memory space upon startup. You are advised to enable this item when off-heap memory is used. |
false |
No |
|
taskmanager.numberOfTaskSlots |
Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter. |
1 |
No |
|
parallelism.default |
Default degree of parallelism, which is used for jobs for which the degree of parallelism is not specified |
1 |
No |
|
task.cancellation.interval |
Interval between two successive task cancellation attempts. The unit is millisecond. |
30000 |
No |
|
taskmanager.memory.fraction |
Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate results. |
0.7 |
No |
|
client.rpc.port |
Akka system listening port on the Flink client. |
32651-32720 |
No |
Only MRS 3.x or later |
jobmanager.heap.size |
Size of the heap memory of JobManager. In yarn-session mode, the value can be transmitted by only the -jm parameter. In yarn-cluster mode, the value can be transmitted by only the -yjm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB. |
1024mb |
No |
|
taskmanager.heap.size |
Size of the heap memory of TaskManager. In yarn-session mode, the value can be transmitted by only the -tm parameter. In yarn-cluster mode, the value can be transmitted by only the -ytm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB. |
1024mb |
No |
|
taskmanager.network.numberOfBuffers |
Number of TaskManager network transmission buffer stacks. If an error indicates insufficient system buffer, increase the parameter value. |
2048 |
No |
|
taskmanager.debug.memory.startLogThread |
Enable this item for debugging Flink memory and garbage collection (GC)-related problems. TaskManager periodically collects memory and GC statistics, including the current utilization of heap and off-heap memory pools and GC time. |
false |
No |
|
taskmanager.debug.memory.logIntervalMs |
Interval at which TaskManager periodically collects memory and GC statistics. |
0 |
No |
|
taskmanager.maxRegistrationDuration |
Maximum duration of TaskManager registration on JobManager. If the actual duration exceeds the value, TaskManager is disabled. |
5 min |
No |
|
taskmanager.initial-registration-pause |
Initial interval between two consecutive registration attempts. The value must contain a time unit (ms/s/min/h/d), for example, 5 seconds. The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively. |
500 ms |
No |
|
taskmanager.max-registration-pause |
Maximum registration retry interval in case of TaskManager registration failures. The unit is ms/s/m/h/d. |
30 s |
No |
|
taskmanager.refused-registration-pause |
Retry interval when a TaskManager registration connection is rejected by JobManager. The unit is ms/s/m/h/d. |
10 s |
No |
|
classloader.resolve-order |
Class resolution policies defined when classes are loaded from user codes, which means whether to first check the user code JAR file (child-first) or the application class path (parent-first). The default setting indicates that the class is first loaded from the user code JAR file, which means that the user code JAR file can contain and load dependencies that are different from those used by Flink. |
child-first |
No |
|
slot.idle.timeout |
Timeout for an idle slot in Slot Pool, in milliseconds. |
50000 |
No |
|
slot.request.timeout |
Timeout for requesting a slot from Slot Pool, in milliseconds. |
300000 |
No |
|
task.cancellation.timeout |
Timeout of task cancellation, in milliseconds. If a task cancellation times out, a fatal TaskManager error may occur. If this parameter is set to 0, no error is reported when a task cancellation times out. |
180000 |
No |
|
taskmanager.network.detailed-metrics |
Indicates whether to enable the detailed metrics monitoring of network queue lengths. |
false |
No |
|
taskmanager.network.memory.buffers-per-channel |
Maximum number of network buffers used by each output/input channel (sub-partition/incoming channel). In credit-based flow control mode, this indicates how much credit is in each input channel. It should be configured with at least 2 buffers to deliver good performance. One buffer is used to receive in-flight data in the sub-partition, and the other for parallel serialization. |
2 |
No |
|
taskmanager.network.memory.floating-buffers-per-gate |
Number of extra network buffers used by each output gate (result partition) or input gate, indicating the amount of floating credit shared among all input channels in credit-based flow control mode. Floating buffers are distributed based on the backlog feedback (real-time output buffers in sub-partitions) and can help mitigate back pressure caused by unbalanced data distribution among sub-partitions. Increase this value if the round-trip time between nodes is long and/or the number of machines in the cluster is large. |
8 |
No |
|
taskmanager.network.memory.fraction |
Ratio of JVM memory used for network buffers, which determines how many streaming data exchange channels a TaskManager can have at the same time and the extent of channel buffering. Increase this value or the values of taskmanager.network.memory.min and taskmanager.network.memory.max if the job is rejected or a warning indicating that the system does not have enough buffers is received. Note that the values of taskmanager.network.memory.min and taskmanager.network.memory.max may overwrite this value. |
0.1 |
No |
|
taskmanager.network.memory.max |
Maximum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). |
1 GB |
No |
|
taskmanager.network.memory.min |
Minimum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). |
64 MB |
No |
|
taskmanager.network.request-backoff.initial |
Minimum backoff for partition requests of input channels. |
100 |
No |
|
taskmanager.network.request-backoff.max |
Maximum backoff for partition requests of input channels. |
10000 |
No |
|
taskmanager.registration.timeout |
Timeout for TaskManager registration. TaskManager will be terminated if it is not successfully registered within the specified time. The value must contain a time unit (ms/s/min/h/d). |
5 min |
No |
|
resourcemanager.taskmanager-timeout |
Timeout interval for releasing an idle TaskManager, in milliseconds. |
30000 |
No |
Blob server
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
blob.server.port |
Blob server port |
32456-32520 |
No |
blob.service.ssl.enabled |
Indicates whether to enable the encryption for the blob transmission channel. This parameter is valid only when the global switch security.ssl is enabled. |
true |
Yes |
blob.fetch.retries |
Number of times that TaskManager tries to download blob files from JobManager. |
50 |
No |
blob.fetch.num-concurrent |
Number of concurrent tasks for downloading blob files supported by JobManager. |
50 |
No |
blob.fetch.backlog |
Number of blob files, such as .jar files, to be downloaded in the queue supported by JobManager. The unit is count. |
1000 |
No |
library-cache-manager.cleanup.interval |
Interval at which JobManager deletes the JAR files stored on the HDFS when the user cancels the Flink job. The unit is second. This parameter is available only to MRS 3.x or later. |
3600 |
No |
Distributed Coordination (via Akka)
Parameter |
Description |
Default Value |
Mandatory |
Remarks |
---|---|---|---|---|
akka.ask.timeout |
Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d. |
10s |
No |
Applicable to all versions |
akka.lookup.timeout |
Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d. |
10s |
No |
|
akka.framesize |
Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB. |
10485760b |
No |
|
akka.watch.heartbeat.interval |
Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d. |
10s |
No |
|
akka.watch.heartbeat.pause |
Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d. |
60s |
No |
|
akka.watch.threshold |
DeathWatch failure detection threshold. A small value may mark normal TaskManager as failed and a large value increases failure detection time. |
12 |
No |
|
akka.tcp.timeout |
Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d. |
20s |
No |
|
akka.throughput |
Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness. |
15 |
No |
|
akka.log.lifecycle.events |
Switch of Akka remote time logging, which can be enabled for debugging. |
false |
No |
|
akka.startup-timeout |
Timeout interval before a remote component fails to be started. The value must contain a time unit (ms/s/min/h/d). |
The value is the same as the value of akka.ask.timeout. |
No |
|
akka.ssl.enabled |
Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled. |
true |
Yes |
|
akka.client-socket-worker-pool.pool-size-factor |
Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. |
1.0 |
No |
Applicable only to MRS 3.x or later |
akka.client-socket-worker-pool.pool-size-max |
Maximum number of threads calculated based on the factor. |
2 |
No |
|
akka.client-socket-worker-pool.pool-size-min |
Minimum number of threads calculated based on the factor. |
1 |
No |
|
akka.client.timeout |
Timeout duration of the client. The value must contain a time unit (ms/s/min/h/d). |
60s |
No |
|
akka.server-socket-worker-pool.pool-size-factor |
Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. |
1.0 |
No |
|
akka.server-socket-worker-pool.pool-size-max |
Maximum number of threads calculated based on the factor. |
2 |
No |
|
akka.server-socket-worker-pool.pool-size-min |
Minimum number of threads calculated based on the factor. |
1 |
No |
SSL
Parameter |
Description |
Default Value |
Mandatory |
Remarks |
---|---|---|---|---|
security.ssl.internal.enabled |
Whether to enable SSL for internal communication. This parameter is automatically configured based on the security mode of the cluster. |
|
Yes |
Applicable only to versions earlier than MRS 3.x |
security.ssl.internal.keystore |
Java keystore file. |
- |
Yes |
|
security.ssl.internal.keystore-password |
Password used to decrypt the keystore file. |
- |
Yes |
|
security.ssl.internal.key-password |
Password used to decrypt the server key in the keystore file. |
- |
Yes |
|
security.ssl.internal.truststore |
truststore file containing the public CA certificates. |
- |
Yes |
|
security.ssl.internal.truststore-password |
Password used to decrypt the truststore file. |
- |
Yes |
|
security.ssl.rest.enabled |
Whether to enable SSL for external communication. This parameter is automatically configured based on the security mode of the cluster. |
|
Yes |
|
security.ssl.rest.keystore |
Java keystore file. |
- |
Yes |
|
security.ssl.rest.keystore-password |
Password used to decrypt the keystore file. |
- |
Yes |
|
security.ssl.rest.key-password |
Password used to decrypt the server key in the keystore file. |
- |
Yes |
|
security.ssl.rest.truststore |
truststore file containing the public CA certificates. |
- |
Yes |
|
security.ssl.rest.truststore-password |
Password used to decrypt the truststore file. |
- |
Yes |
|
security.ssl.protocol |
SSL transmission protocol version. |
TLSv1.2 |
Yes |
Applicable to all versions |
security.ssl.algorithms |
Supported SSL standard algorithm. For details, visit the Java official website at: http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites. |
TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 |
Yes |
|
security.ssl.enabled |
Whether to enable SSL for internal communication. This parameter is automatically configured based on the cluster installation mode. |
|
Yes |
Applicable only to MRS 3.x or later |
security.ssl.keystore |
Java keystore file. |
- |
Yes |
|
security.ssl.keystore-password |
Password used to decrypt the keystore file. |
- |
Yes |
|
security.ssl.key-password |
Password used to decrypt the server key in the keystore file. |
- |
Yes |
|
security.ssl.truststore |
truststore file containing the public CA certificates. |
- |
Yes |
|
security.ssl.truststore-password |
Password used to decrypt the truststore file. |
- |
Yes |
Network communication (via Netty)
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
taskmanager.network.netty.num-arenas |
Number of Netty memory blocks. |
1 |
No |
taskmanager.network.netty.server.numThreads |
Number of Netty server threads |
1 |
No |
taskmanager.network.netty.client.numThreads |
Number of Netty client threads |
1 |
No |
taskmanager.network.netty.client.connectTimeoutSec |
Netty client connection timeout duration. The unit is second. |
120 |
No |
taskmanager.network.netty.sendReceiveBufferSize |
Size of Netty sending and receiving buffers. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MB in modern Linux. The unit is bytes. |
4096 |
No |
taskmanager.network.netty.transport |
Netty transport type, either nio or epoll |
nio |
No |
JobManager Web Frontend
Parameter |
Description |
Default Value |
Mandatory |
Remarks |
---|---|---|---|---|
jobmanager.web.port |
The web port. Value range: 32261–32325. |
32261-32325 |
No |
Applicable only to versions earlier than MRS 3.x |
jobmanager.web.allow-access-address |
Web access whitelist. IP addresses are separated by commas (,). Only whitelisted IP addresses can access the web. |
* |
Yes |
Applicable to all versions |
flink.security.enable |
When installing a Flink cluster, you are required to select security mode or normal mode.
For an installed Flink cluster, you can view the configured value to determine whether the cluster is in security or normal mode. |
Automatic configuration |
No |
Applicable only to MRS 3.x or later |
rest.bind-port |
The web port. Value range: 32261–32325. |
32261-32325 |
No |
|
jobmanager.web.history |
Number of recent jobs to be displayed. |
5 |
No |
|
jobmanager.web.checkpoints.disable |
Whether to disable checkpoint statistics. |
false |
No |
|
jobmanager.web.checkpoints.history |
Number of checkpoint statistical records. |
10 |
No |
|
jobmanager.web.backpressure.cleanup-interval |
Interval for clearing unaccessed backpressure records. The unit is millisecond. |
600000 |
No |
|
jobmanager.web.backpressure.refresh-interval |
Interval for updating backpressure records. The unit is millisecond. |
60000 |
No |
|
jobmanager.web.backpressure.num-samples |
Number of stack tracing records for reverse pressure calculation. |
100 |
No |
|
jobmanager.web.backpressure.delay-between-samples |
Sampling interval for reverse pressure calculation. The unit is millisecond. |
50 |
No |
|
jobmanager.web.ssl.enabled |
Whether SSL encryption is enabled for web transmission. This parameter is valid only when the global switch security.ssl is enabled. |
false |
Yes |
|
jobmanager.web.accesslog.enable |
Switch to enable or disable web operation logs. The log is stored in webaccess.log. |
true |
Yes |
|
jobmanager.web.x-frame-options |
Value of the HTTP security header X-Frame-Options. The value can be SAMEORIGIN, DENY, or ALLOW-FROM uri. |
DENY |
Yes |
|
jobmanager.web.cache-directive |
Whether the web page can be cached. |
no-store |
Yes |
|
jobmanager.web.expires-time |
Expiration duration of web page cache. The unit is millisecond. |
0 |
Yes |
|
jobmanager.web.access-control-allow-origin |
Web page same-origin policy that prevents cross-domain attacks. |
* |
Yes |
|
jobmanager.web.refresh-interval |
Web page refresh interval. The unit is millisecond. |
3000 |
Yes |
|
jobmanager.web.logout-timer |
Automatic logout interval when no operation is performed. The unit is millisecond. |
600000 |
Yes |
|
jobmanager.web.403-redirect-url |
Web page access error 403. If 403 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
|
jobmanager.web.404-redirect-url |
Web page access error 404. If 404 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
|
jobmanager.web.415-redirect-url |
Web page access error 415. If 415 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
|
jobmanager.web.500-redirect-url |
Web page access error 500. If 500 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
|
rest.await-leader-timeout |
Time of the client waiting for the leader address. The unit is millisecond. |
30000 |
No |
|
rest.client.max-content-length |
Maximum content length that the client handles (unit: bytes). |
104857600 |
No |
|
rest.connection-timeout |
Maximum time for the client to establish a TCP connection (unit: ms). |
15000 |
No |
|
rest.idleness-timeout |
Maximum time for a connection to stay idle before failing (unit: ms). |
300000 |
No |
|
rest.retry.delay |
The time that the client waits between retries (unit: ms). |
3000 |
No |
|
rest.retry.max-attempts |
The number of retry times if a retrievable operator fails. |
20 |
No |
|
rest.server.max-content-length |
Maximum content length that the server handles (unit: bytes). |
104857600 |
No |
|
rest.server.numThreads |
Maximum number of threads for the asynchronous processing of requests. |
4 |
No |
|
web.timeout |
Timeout for web monitor (unit: ms). |
10000 |
No |
File Systems
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
fs.overwrite-files |
Whether to overwrite the existing file by default when the file is written. |
false |
No |
fs.output.always-create-directory |
When the degree of parallelism (DOP) of file writing programs is greater than 1, a directory is created under the output file path and different result files (each parallel write program) are stored in the directory.
|
false |
No |
State Backend
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
state.backend.fs.checkpointdir |
Path when the backend is set to filesystem. The path must be accessible by JobManager. Only the local mode is supported. In the cluster mode, use an HDFS path. |
hdfs:///flink/checkpoints |
No |
state.savepoints.dir |
Savepoint storage directory used by Flink to restore and update jobs. When a savepoint is triggered, the metadata of the savepoint is saved to this directory. |
hdfs:///flink/savepoint |
Mandatory in security mode |
restart-strategy |
Default restart policy, which is used for jobs for which no restart policy is specified.
|
none |
No |
restart-strategy.fixed-delay.attempts |
The retry times of the fixed-delay policy. |
|
No |
restart-strategy.fixed-delay.delay |
Retry interval when the fixed-delay strategy is used. The unit is ms/s/m/h/d. |
|
No |
restart-strategy.failure-rate.max-failures-per-interval |
Maximum number of restart times in a specified period before a job fails when the fault rate policy is used. |
1 |
No |
restart-strategy.failure-rate.failure-rate-interval |
Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d. |
60 s |
No |
restart-strategy.failure-rate.delay |
Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d. |
The default value is the same as the value of akka.ask.timeout. For details, see Distributed Coordination (via Akka). |
No |
Kerberos-based Security
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
security.kerberos.login.keytab |
Keytab file path. This parameter is a client parameter. |
Configure the parameter based on actual service requirements. |
Yes |
security.kerberos.login.principal |
A parameter on the client. If security.kerberos.login.keytab and security.kerberos.login.principal are both set, keytab certificate is used by default. |
Configure the parameter based on actual service requirements. |
No |
security.kerberos.login.contexts |
Contexts of the jass file generated by Flink. This parameter is a server parameter. |
Client, KafkaClient |
Yes |
HA
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
high-availability |
Whether HA is enabled. Only the following two modes are supported currently:
|
zookeeper |
No |
high-availability.zookeeper.quorum |
ZooKeeper quorum address. |
Automatic configuration |
No |
high-availability.zookeeper.path.root |
Root directory that Flink creates on ZooKeeper, storing metadata required in HA mode. |
/flink |
No |
high-availability.storageDir |
Directory for storing JobManager metadata of state backend. ZooKeeper stores only pointers to actual data. |
hdfs:///flink/recovery |
No |
high-availability.zookeeper.client.session-timeout |
Session timeout duration on the ZooKeeper client. The unit is millisecond. |
60000 |
No |
high-availability.zookeeper.client.connection-timeout |
Connection timeout duration on the ZooKeeper client. The unit is millisecond. |
15000 |
No |
high-availability.zookeeper.client.retry-wait |
Retry waiting time on the ZooKeeper client. The unit is millisecond. |
5000 |
No |
high-availability.zookeeper.client.max-retry-attempts |
Maximum retry times on the ZooKeeper client. |
3 |
No |
high-availability.job.delay |
Delay of job restart when JobManager recovers. This parameter is available only to MRS 3.x or later. |
The default value is the same as the value of akka.ask.timeout. |
No |
high-availability.zookeeper.client.acl |
Configure the ACL (open creator) of the ZooKeeper node. The ACL is automatically configured based on the security mode of the cluster. For ACL options, see https://zookeeper.apache.org/doc/r3.5.1-alpha/zookeeperProgrammers.html#sc_BuiltinACLSchemes. |
|
Yes |
zookeeper.sasl.disable |
Whether to enable SASL authentication. This parameter is automatically configured based on the security mode of the cluster. |
|
Yes |
zookeeper.sasl.service-name |
|
zookeeper |
Yes |
Environment
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
env.java.opts |
JVM parameter, which is transferred to the startup script, JobManager, TaskManager, and Yarn client. For example, transfer remote debugging parameters. |
-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M -Djdk.tls.ephemeralDHKeySize=2048 -Djava.library.path=${HADOOP_COMMON_HOME}/lib/native -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false -Dbeetle.application.home.path=/opt/xxx/Bigdata/common/runtime/security/config |
No |
Yarn
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
yarn.maximum-failed-containers |
Maximum number of containers the system is going to reallocate in case of a container failure of TaskManager The default value is the number of TaskManagers when the Flink cluster is started. |
5 |
No |
yarn.application-attempts |
Number of ApplicationMaster restarts. The value is the maximum value in the validity interval that is set to Akka's timeout in Flink. After the restart, the IP address and port number of ApplicationMaster will change and you will need to connect to the client manually. |
2 |
No |
yarn.heartbeat-delay |
Time between heartbeats with the ApplicationMaster and Yarn ResourceManager in seconds. The unit is second. |
5 |
No |
yarn.containers.vcores |
Number of virtual cores of each Yarn container |
Number of TaskManager slots |
No |
yarn.application-master.port |
ApplicationMaster port number setting. A port number range is supported. |
32586-32650 |
No |
Pipeline
It applies to MRS 3.x or later.
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
nettyconnector.registerserver.topic.storage |
Path (on a third-party server) to information about IP address, port numbers, and concurrency of NettySink. ZooKeeper is recommended for storage. |
/flink/nettyconnector |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.sinkserver.port.range |
Port range of NettySink. |
28444-28843 |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.ssl.enabled |
Whether SSL encryption for the communication between NettySink and NettySource is enabled. For details about the encryption key and protocol, see SSL. |
false |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.message.delimiter |
Delimiter used to configure the message sent by NettySink to the NettySource, which is 2-4 bytes long, and cannot contain \n, #, or space. |
The default value is $_. |
No. However, if pipeline is enabled, the feature is mandatory. |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.