Configuring Flink Service Parameters
Description
All Flink parameters can be configured on the client. You are advised to modify the flink-conf.yaml configuration file on the client. If Flink service parameters are modified on FusionInsight Manager, you need to download and install the client again after the configuration is complete.
- Configuration file path: Client installation path/Flink/flink/conf/flink-conf.yaml
- File configuration format: Key:Value
Example: taskmanager.heap.size: 1024mb
A space is required between Key: and Value.
Configurations
This section describes the following parameters:
- JobManager & TaskManager:
JobManager and TaskManager are main components of Flink. For various security and performance scenarios, configuration items include communication ports, memory management, and connection retry.
- Blob server:
The Blob server on the JobManager node is used to receive JAR packages uploaded by users on the client, send JAR packages to TaskManager, and transfer log files. The configuration items include the port, SSL, retry times, and concurrency.
- Distributed Coordination (via Akka):
The Akka actor model is the basis of communications between a Flink client and JobManager, JobManager and TaskManager, as well as TaskManagers. Related parameters can be configured based on the network environment or optimization policy. The configuration items include the timeout settings for message sending and waiting and the Akka listening mechanism DeathWatch.
- SSL:
To configure a secure Flink cluster, you need to configure SSL-related configuration items, including the SSL switch, certificate, password, and encryption algorithm.
- Network communication (via Netty):
When Flink runs a job, data transmission and reverse pressure detection between tasks depend on Netty. In certain environments, Netty parameters should be configured. For advanced optimization, you can adjust some Netty configuration items. The default configuration can meet the requirements of concurrent and high-throughput tasks in a large-scale cluster.
- JobManager Web Frontend:
When JobManager is started, the web server is started in the same process. You can access the web server to obtain information about the current Flink cluster, including JobManager, TaskManager, and jobs running in the cluster. Configuration items of the web server include the port, temporary directory, display items, error redirection, and security-related items.
- File Systems:
When a task is running, a result file is created. You can configure the file creation behavior, including the file overwriting policy and directory creation.
- State Backend:
Flink enables HA and job exception, as well as job pause and recovery during version upgrade. Flink depends on state backend to store job states and on the restart strategy to restart a job. You can configure state backend and the restart strategy. Configuration items include the state backend type, storage path, and restart strategy.
- Kerberos-based Security:
Kerberos-related configuration items must be configured in Flink security mode. The configuration items include keytab and principal of Kerberos.
- HA:
The HA mode of Flink depends on ZooKeeper. Therefore, ZooKeeper configurations must be configured, including the ZooKeeper address, path, and security authentication.
- Environment:
In scenarios raising special requirements on JVM configuration, users can use configuration items to transfer JVM parameters to the client, JobManager, and TaskManager.
- Yarn:
Flink runs on a Yarn cluster and JobManager runs on ApplicationMaster. Some configuration parameters of JobManager depend on Yarn. You can configure YARN-related configurations to enable Flink to better run on Yarn. The configuration items include the memory, virtual kernel, and port of Yarn containers.
- Pipeline:
The Netty connection is used among multiple jobs to reduce latency. In this case, NettySink is used on the server and NettySource is used on the client for data transmission. Configuration items include NettySink information storing path, range of NettySink listening port, whether to enable SSL encryption, domain of the network used for NettySink monitoring.
- Enabling the Alarm Function for Job Submission on the Client:
By default, the alarm function is disabled for jobs submitted through the Flink client. To enable it, install two FlinkServer instances on the node where the jobs are submitted and configure related parameters in the flink-conf.yaml file on the client.
If the HA mode of Flink servers is changed on the FusionInsight Manager of an ECS/BMS cluster, you must update the Flink configuration on the default client of the cluster to ensure that the alarm function of Flink jobs can work properly.
Perform the following steps to update Flink configuration:
- FlinkServer HA
You can start the FlinkServer in HA mode. To do so, you need to configure the floating IP address and arbitration IP address of the FlinkServer.
JobManager & TaskManager
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
taskmanager.rpc.port |
IPC port range of TaskManager |
32326-32390 |
No |
taskmanager.memory.segment-size |
Size of the memory buffer used by the memory manager and network stack The unit is bytes. |
32768 |
No |
taskmanager.data.port |
Data exchange port range of TaskManager |
32391-32455 |
No |
taskmanager.data.ssl.enabled |
Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled. |
false |
No |
taskmanager.numberOfTaskSlots |
Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter. |
2 |
No |
parallelism.default |
Default degree of parallelism, which is used for jobs for which the degree of parallelism is not specified |
1 |
No |
task.cancellation.interval |
Interval between two successive task cancellation attempts. The unit is millisecond. |
30000 |
No |
client.rpc.port |
Akka system listening port on the Flink client. |
32651-32720 |
No |
jobmanager.memory.process.size |
Size of the heap memory of JobManager. In yarn-session mode, the value can be transmitted by only the -jm parameter. In yarn-cluster mode, the value can be transmitted by only the -yjm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB. |
2GB |
No |
taskmanager.memory.process.size |
Size of the heap memory of TaskManager. In yarn-session mode, the value can be transmitted by only the -tm parameter. In yarn-cluster mode, the value can be transmitted by only the -ytm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB. |
4GB |
No |
taskmanager.network.numberOfBuffers |
Number of TaskManager network transmission buffer stacks. If an error indicates insufficient system buffer, increase the parameter value. |
2048 |
No |
taskmanager.debug.memory.log |
Enable this item for debugging Flink memory and garbage collection (GC)-related problems. TaskManager periodically collects memory and GC statistics, including the current utilization of heap and off-heap memory pools and GC time. |
false |
No |
taskmanager.debug.memory.log-interval |
Interval for TaskManager to periodically record memory and GC statistics, in milliseconds |
5000 |
No |
taskmanager.maxRegistrationDuration |
Maximum duration of TaskManager registration on JobManager. If the actual duration exceeds the value, TaskManager is disabled. |
5 min |
No |
taskmanager.initial-registration-pause |
Initial interval between two consecutive registration attempts. The value must contain a time unit (ms/s/min/h/d), for example, 5 seconds. The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively. |
500 ms |
No |
taskmanager.max-registration-pause |
Maximum registration retry interval in case of TaskManager registration failures. The unit is ms/s/m/h/d. |
30s |
No |
taskmanager.refused-registration-pause |
Retry interval when a TaskManager registration connection is rejected by JobManager. The unit is ms/s/m/h/d. |
10s |
No |
classloader.resolve-order |
Class resolution policies defined when classes are loaded from user codes, which means whether to first check the user code JAR file (child-first) or the application class path (parent-first). The default setting indicates that the class is first loaded from the user code JAR file, which means that the user code JAR file can contain and load dependencies that are different from those used by Flink. |
child-first |
No |
slot.idle.timeout |
Timeout for an idle slot in Slot Pool, in milliseconds. |
50000 |
No |
slot.request.timeout |
Timeout for requesting a slot from Slot Pool, in milliseconds. |
300000 |
No |
task.cancellation.timeout |
Timeout of task cancellation, in milliseconds. If a task cancellation times out, a fatal TaskManager error may occur. If this parameter is set to 0, no error is reported when a task cancellation times out. |
180000 |
No |
taskmanager.network.detailed-metrics |
Indicates whether to enable the detailed metrics monitoring of network queue lengths. |
false |
No |
taskmanager.network.memory.buffers-per-channel |
Maximum number of network buffers used by each outgoing/incoming stream (sub-partition/input streams). In credit-based flow control, this indicates how many credits are in each input stream. It should be configured with at least 2 buffers to deliver good performance. One buffer is used to receive in-flight data in the sub-partition, and the other for parallel serialization. |
2 |
No |
taskmanager.network.memory.floating-buffers-per-gate |
Number of extra network buffers used by each output gate (result partition) or input gate, indicating the amount of floating credit shared among all input channels in credit-based flow control mode. Floating buffers are distributed based on the backlog feedback (real-time output buffers in sub-partitions) and can help mitigate back pressure caused by unbalanced data distribution among sub-partitions. Increase this value if the round-trip time between nodes is long and/or the number of machines in the cluster is large. |
8 |
No |
taskmanager.network.memory.fraction |
Ratio of JVM memory used for network buffers, which determines how many streaming data exchange channels a TaskManager can have at the same time and the extent of channel buffering. Increase this value or the values of taskmanager.network.memory.min and taskmanager.network.memory.max if the job is rejected or a warning indicating that the system does not have enough buffers is received. Note that the values of taskmanager.network.memory.min and taskmanager.network.memory.max may overwrite this value. |
0.1 |
No |
taskmanager.network.memory.max |
Maximum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). |
5GB |
No |
taskmanager.network.memory.min |
Minimum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). |
64MB |
No |
taskmanager.network.request-backoff.initial |
Minimum backoff for partition requests of input channels. |
100 |
No |
taskmanager.network.request-backoff.max |
Maximum backoff for partition requests of input channels. |
10000 |
No |
taskmanager.registration.timeout |
Timeout for TaskManager registration. TaskManager will be terminated if it is not successfully registered within the specified time. The value must contain a time unit (ms/s/min/h/d). |
5 min |
No |
resourcemanager.taskmanager-timeout |
Timeout interval for releasing an idle TaskManager, in milliseconds. |
30000 |
No |
Blob server
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
blob.server.port |
Blob server port |
32456-32520 |
No |
blob.service.ssl.enabled |
Indicates whether to enable the encryption for the blob transmission channel. This parameter is valid only when the global switch security.ssl is enabled. |
true |
Yes |
blob.fetch.retries |
Number of times that TaskManager tries to download blob files from JobManager. |
50 |
No |
blob.fetch.num-concurrent |
Number of concurrent tasks for downloading blob files supported by JobManager. |
50 |
No |
blob.fetch.backlog |
Number of blob files, such as .jar files, to be downloaded in the queue supported by JobManager. The unit is count. |
1000 |
No |
library-cache-manager.cleanup.interval |
Interval at which JobManager deletes the JAR files stored on the HDFS when the user cancels the Flink job. The unit is second. |
3600 |
No |
Distributed Coordination (via Akka)
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
akka.ask.timeout |
Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d. |
300s |
No |
akka.lookup.timeout |
Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d. |
10s |
No |
akka.framesize |
Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB. |
10485760b |
No |
akka.watch.heartbeat.interval |
Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d. |
10s |
No |
akka.watch.heartbeat.pause |
Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d. |
60s |
No |
akka.watch.threshold |
DeathWatch failure detection threshold. A small value may mark normal TaskManager as failed and a large value increases failure detection time. |
12 |
No |
akka.tcp.timeout |
Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d. |
20s |
No |
akka.throughput |
Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness. |
15 |
No |
akka.log.lifecycle.events |
Switch of Akka remote time logging, which can be enabled for debugging. |
false |
No |
akka.startup-timeout |
Timeout interval before a remote component fails to be started. The value must contain a time unit (ms/s/min/h/d). |
The value is the same as the value of akka.ask.timeout. |
No |
akka.ssl.enabled |
Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled. |
true |
Yes |
akka.client-socket-worker-pool.pool-size-factor |
Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. |
1.0 |
No |
akka.client-socket-worker-pool.pool-size-max |
Maximum number of threads calculated based on the factor. |
2 |
No |
akka.client-socket-worker-pool.pool-size-min |
Minimum number of threads calculated based on the factor. |
1 |
No |
akka.client.timeout |
Timeout duration of the client. The value must contain a time unit (ms/s/min/h/d). |
60s |
No |
akka.server-socket-worker-pool.pool-size-factor |
Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. |
1.0 |
No |
akka.server-socket-worker-pool.pool-size-max |
Maximum number of threads calculated based on the factor. |
2 |
No |
akka.server-socket-worker-pool.pool-size-min |
Minimum number of threads calculated based on the factor. |
1 |
No |
SSL
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
security.ssl.protocol |
SSL transmission protocol version. |
TLSv1.2 |
Yes |
security.ssl.algorithms |
Supported SSL standard algorithm.. |
TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 |
Yes |
security.ssl.enabled |
Specifies whether to enable SSL for internal communication. This parameter is automatically configured based on the cluster installation mode. |
|
Yes |
security.ssl.keystore |
Java keystore file. |
- |
Yes |
security.ssl.keystore-password |
Password used to decrypt the keystore file. |
- |
Yes |
security.ssl.key-password |
Password used to decrypt the server key in the keystore file. |
- |
Yes |
security.ssl.truststore |
truststore file containing the public CA certificates. |
- |
Yes |
security.ssl.truststore-password |
Password used to decrypt the truststore file. |
- |
Yes |
Network communication (via Netty)
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
taskmanager.network.netty.num-arenas |
Number of Netty memory blocks. |
1 |
No |
taskmanager.network.netty.server.numThreads |
Number of Netty server threads The value -1 indicates that the number of threads is equal to the number of TM slots. |
-1 |
No |
taskmanager.network.netty.client.numThreads |
Number of Netty client threads The value -1 indicates that the number of threads is equal to the number of TM slots. |
-1 |
No |
taskmanager.network.netty.client.connectTimeoutSec |
Netty client connection timeout duration. The unit is second. |
120 |
No |
taskmanager.network.netty.sendReceiveBufferSize |
Size of Netty sending and receiving buffers. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MB in modern Linux. The unit is bytes. |
4096 |
No |
taskmanager.network.netty.transport |
Netty transport type, either nio or epoll |
nio |
No |
JobManager Web Frontend
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
jobmanager.web.allow-access-address |
Web access whitelist. IP addresses are separated by commas (,). Only whitelisted IP addresses can access the web. |
* |
Yes |
flink.security.enable |
When installing a Flink cluster, you are required to select security mode or normal mode.
For an installed Flink cluster, you can view the configured value to determine whether the cluster is in security or normal mode. |
Automatic configuration |
No |
rest.bind-port |
The web port. Value range: 32261–32325. |
32261-32325 |
No |
jobmanager.web.history |
Number of recent jobs to be displayed. |
5 |
No |
jobmanager.web.checkpoints.disable |
Whether to disable checkpoint statistics. |
false |
No |
jobmanager.web.checkpoints.history |
Number of checkpoint statistical records. |
10 |
No |
jobmanager.web.backpressure.cleanup-interval |
Interval for clearing unaccessed backpressure records. The unit is millisecond. |
600000 |
No |
jobmanager.web.backpressure.refresh-interval |
Interval for updating backpressure records. The unit is millisecond. |
60000 |
No |
jobmanager.web.backpressure.num-samples |
Number of stack tracing records for reverse pressure calculation. |
100 |
No |
jobmanager.web.backpressure.delay-between-samples |
Sampling interval for reverse pressure calculation. The unit is millisecond. |
50 |
No |
jobmanager.web.ssl.enabled |
Whether SSL encryption is enabled for web transmission. This parameter is valid only when the global switch security.ssl is enabled. |
false |
Yes |
jobmanager.web.accesslog.enable |
Switch to enable or disable web operation logs. The log is stored in webaccess.log. |
true |
Yes |
jobmanager.web.x-frame-options |
Value of the HTTP security header X-Frame-Options. The value can be SAMEORIGIN, DENY, or ALLOW-FROM uri. |
DENY |
Yes |
jobmanager.web.cache-directive |
Whether the web page can be cached. |
no-store |
Yes |
jobmanager.web.expires-time |
Expiration duration of web page cache. The unit is millisecond. |
0 |
Yes |
jobmanager.web.access-control-allow-origin |
Web page same-origin policy that prevents cross-domain attacks. |
* |
Yes |
jobmanager.web.refresh-interval |
Web page refresh interval. The unit is millisecond. |
3000 |
Yes |
jobmanager.web.logout-timer |
Automatic logout interval when no operation is performed. The unit is millisecond. |
600000 |
Yes |
jobmanager.web.403-redirect-url |
Web page access error 403. If 403 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
jobmanager.web.404-redirect-url |
Web page access error 404. If 404 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
jobmanager.web.415-redirect-url |
Web page access error 415. If 415 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
jobmanager.web.500-redirect-url |
Web page access error 500. If 500 error occurs, the page switch to a specified page. |
Automatic configuration |
Yes |
rest.await-leader-timeout |
Time of the client waiting for the leader address. The unit is millisecond. |
30000 |
No |
rest.client.max-content-length |
Maximum content length that the client handles (unit: bytes). |
104857600 |
No |
rest.connection-timeout |
Maximum time for the client to establish a TCP connection (unit: ms). |
15000 |
No |
rest.idleness-timeout |
Maximum time for a connection to stay idle before failing (unit: ms). |
300000 |
No |
rest.retry.delay |
The time that the client waits between retries (unit: ms). |
3000 |
No |
rest.retry.max-attempts |
The number of retry times if a retrievable operator fails. |
20 |
No |
rest.server.max-content-length |
Maximum content length that the server handles (unit: bytes). |
104857600 |
No |
rest.server.numThreads |
Maximum number of threads for the asynchronous processing of requests. |
4 |
No |
web.timeout |
Timeout for web monitor (unit: ms). |
10000 |
No |
File Systems
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
fs.overwrite-files |
Whether to overwrite the existing file by default when the file is written. |
false |
No |
fs.output.always-create-directory |
When the degree of parallelism (DOP) of file writing programs is greater than 1, a directory is created under the output file path and different result files (each parallel write program) are stored in the directory.
|
false |
No |
State Backend
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
state.backend.fs.checkpointdir |
Path when the backend is set to filesystem. The path must be accessible by JobManager. Only the local mode is supported. In the cluster mode, use an HDFS path. |
hdfs:///flink/checkpoints |
No |
state.savepoints.dir |
Savepoint storage directory used by Flink to restore and update jobs. When a savepoint is triggered, the metadata of the savepoint is saved to this directory. |
hdfs:///flink/savepoint |
Mandatory in security mode |
restart-strategy |
Default restart policy, which is used for jobs for which no restart policy is specified.
|
fixed-delay |
No |
restart-strategy.fixed-delay.attempts |
The retry times of the fixed-delay policy. |
|
No |
restart-strategy.fixed-delay.delay |
Retry interval when the fixed-delay strategy is used. The unit is ms/s/m/h/d. |
|
No |
restart-strategy.failure-rate.max-failures-per-interval |
Maximum number of restart times in a specified period before a job fails when the fault rate policy is used. |
1 |
No |
restart-strategy.failure-rate.failure-rate-interval |
Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d. |
60s |
No |
restart-strategy.failure-rate.delay |
Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d. |
The default value is the same as the value of akka.ask.timeout. |
No |
Kerberos-based Security
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
security.kerberos.login.keytab |
Keytab file path. This parameter is a client parameter. |
Configure the parameter based on actual service requirements. |
Yes |
security.kerberos.login.principal |
A parameter on the client. If security.kerberos.login.keytab and security.kerberos.login.principal are both set, keytab certificate is used by default. |
Configure the parameter based on actual service requirements. |
No |
security.kerberos.login.contexts |
Contexts of the jass file generated by Flink. This parameter is a server parameter. |
Client, KafkaClient |
Yes |
HA
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
high-availability |
Whether HA is enabled. Only the following two modes are supported currently:
|
zookeeper |
No |
high-availability.zookeeper.quorum |
ZooKeeper quorum address. |
Automatic configuration |
No |
high-availability.zookeeper.path.root |
Root directory that Flink creates on ZooKeeper, storing metadata required in HA mode. |
/flink |
No |
high-availability.storageDir |
Directory for storing JobManager metadata of state backend. ZooKeeper stores only pointers to actual data. |
hdfs:///flink/recovery |
No |
high-availability.zookeeper.client.session-timeout |
Session timeout duration on the ZooKeeper client. The unit is millisecond. |
90000 |
No |
high-availability.zookeeper.client.connection-timeout |
Connection timeout duration on the ZooKeeper client. The unit is millisecond. |
15000 |
No |
high-availability.zookeeper.client.retry-wait |
Retry waiting time on the ZooKeeper client. The unit is millisecond. |
5000 |
No |
high-availability.zookeeper.client.max-retry-attempts |
Maximum retry times on the ZooKeeper client. |
3 |
No |
high-availability.job.delay |
Delay of job restart when JobManager recovers. |
The default value is the same as the value of akka.ask.timeout. |
No |
high-availability.zookeeper.client.acl |
Set the ACL (open creator) of the ZooKeeper node. The ACL is automatically configured based on the security mode of the cluster. |
|
Yes |
zookeeper.sasl.disable |
Indicates whether to enable SASL authentication. This parameter is automatically configured based on the security mode of the cluster. |
|
Yes |
zookeeper.sasl.service-name |
|
zookeeper |
Yes |
Environment
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
env.java.opts |
JVM parameter, which is transferred to the startup script, JobManager, TaskManager, and Yarn client. For example, transfer remote debugging parameters. |
-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M -Djdk.tls.ephemeralDHKeySize=2048 -Djava.library.path=${HADOOP_COMMON_HOME}/lib/native -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false -Dbeetle.application.home.path=/opt/xxx/Bigdata/common/runtime/security/config |
No |
Yarn
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
yarn.maximum-failed-containers |
Maximum number of containers the system is going to reallocate in case of a container failure of TaskManager The default value is the number of TaskManagers when the Flink cluster is started. |
5 |
No |
yarn.application-attempts |
Number of ApplicationMaster restarts. The value is the maximum value in the validity interval that is set to Akka's timeout in Flink. After the restart, the IP address and port number of ApplicationMaster will change and you will need to connect to the client manually. |
2 |
No |
yarn.heartbeat-delay |
Time between heartbeats with the ApplicationMaster and Yarn ResourceManager in seconds. The unit is second. |
5 |
No |
yarn.containers.vcores |
Number of virtual cores of each Yarn container |
Number of TaskManager slots |
No |
yarn.application-master.port |
ApplicationMaster port number setting. A port number range is supported. |
32586-32650 |
No |
Pipeline
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
nettyconnector.registerserver.topic.storage |
Path (on a third-party server) to information about IP address, port numbers, and concurrency of NettySink. ZooKeeper is recommended for storage. |
/flink/nettyconnector |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.sinkserver.port.range |
Port range of NettySink. |
28444-28843 |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.ssl.enabled |
Whether SSL encryption for the communication between NettySink and NettySource is enabled. For details about the encryption key and protocol, see SSL. |
false |
No. However, if pipeline is enabled, the feature is mandatory. |
nettyconnector.message.delimiter |
Delimiter used to configure the message sent by NettySink to the NettySource, which is 2-4 bytes long, and cannot contain \n, #, or space. |
The default value is $_. |
No. However, if pipeline is enabled, the feature is mandatory. |
Enabling the Alarm Function for Job Submission on the Client
Parameter |
Description |
Value |
Mandatory |
---|---|---|---|
job.alarm.enable |
Whether to enable the alarm function. |
true |
Yes |
flinkserver.host.ip |
Service IP addresses of the two FlinkServer instances |
x.x.x.x,x.x.x.x |
Yes |
FlinkServer HA
Parameter |
Description |
Default Value |
Mandatory |
---|---|---|---|
flink_ha_enabled |
Whether to start FlinkServer in HA mode. |
true |
Yes |
flink.ha.floatip |
Floating IP address used by the FlinkServer, which is configured on the service plane. The IP address must be unique and not in use. |
x.x.x.x |
Yes |
flink.ha.mediator.ip |
Arbitration IP address of the HA server. The value is the gateway IP address of the service plane. |
x.x.x.x |
Yes |
flink.ha.net.timeout |
Timeout for FlinkServer network connection The unit is second. |
20 |
No |
flink.ha.heartbeat.port |
Port for HA heartbeat link |
28944 |
No |
flink.ha.rpc.port |
Port for HA RPC communication |
28946 |
No |
flink.ha.sync.port |
Port used to synchronize HA files |
28945 |
No |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot