Help Center > > User Guide> MRS Cluster Component Operation Guide> Using Flink> Configuring and Managing Flink

Configuring and Managing Flink

Updated at: Dec 31, 2019 GMT+08:00

Configuring Parameter Paths

All parameters of Flink must be set on a client. The path of a configuration file is as follows: Client installation path/Flink/flink/conf/flink-conf.yaml

  • You are advised to modify the flink-conf.yaml configuration file on the client to configure parameters. The configuration format of the YAML file is key: value.

    Example: taskmanager.heap.mb: 512

    Note that a space is required between key: and value.

  • If you modify parameter configuration in Service Configuration on MRS Manager, the client must be downloaded and installed again after the modification.

Configuring the JobManager and TaskManager

The JobManager and TaskManager are main components of Flink. You can set related parameters on the client based on various security and performance scenarios.

The main configuration items include the communication ports, memory management, and connection retry.

Table 1 Parameter description

Parameter

Mandatory

Default Value

Description

taskmanager.rpc.port

No

32326-32390

Akka system listening port on the TaskManager

taskmanager.data.port

No

32391-32455

NettyServer listening port used for data exchange operations between TaskManagers

taskmanager.data.ssl.enabled

No

false

SSL flag for data communication between TaskManagers. This is applicable only when the global ssl flag security.ssl is enabled.

taskmanager.numberOfTaskSlots

No

3

Number of slots occupied by the TaskManager. Generally, the value is configured as the number of physical CPU cores. In yarn-session mode, the value can be transmitted by the -s parameter only. In yarn-cluster mode, the value can be transmitted by the -ys parameter only.

parallelism.default

No

1

Number of concurrent job operators

taskmanager.memory.size

No

0

Amount of memory that the TaskManager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified, the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by taskmanager.memory.fraction. The unit is MB.

taskmanager.memory.fraction

No

0.7

Ratio of JVM heap memory that the TaskManager reserves for sorting, hash tables, and caching of intermediate results.

taskmanager.memory.off-heap

Yes

false

Whether the TaskManager uses off-heap memory, which is used for sorting, hash tables, and caching of intermediate results. You are advised to enable this configuration item for large memory to improve memory operation efficiency.

taskmanager.memory.segment-size

No

32768

Size of memory segments on the TaskManager. Memory segment is the basic unit of the reserved memory space and is used to configure a network buffer stack. The unit is bytes.

taskmanager.memory.preallocate

No

false

Whether TaskManagers should allocate all reserved memory when starting up. When off-heap memory is used, it is recommended that this configuration item be enabled.

taskmanager.registration.initial-backoff

No

500 ms

Initial registration backoff between two consecutive registration attempts. The unit is ms/s/m/h/d.

NOTE:

There is a single-byte space between the value and the unit. ms/s/m/h/d indicates millisecond, second, minute, hour, day, respectively.

taskmanager.registration.refused-backoff

No

5 min

Backoff after a registration has been refused by the JobManager before retrying to connect.

task.cancellation.interval

No

30000

Time interval between two successive task cancellation attempts in milliseconds.

Configuring the BLOB

A BLOB server on a JobManager node is used to receive JAR files uploaded from a client, send JAR files to the TaskManager, or upload logs. Flink provides configuration items about the BLOB server in the flink-conf.yaml file.

You can configure configuration items, such as the port, SSL, retry times, and concurrency.

Table 2 Parameter description

Parameter

Mandatory

Default Value

Description

blob.server.port

No

32456-32520

BLOB server port

blob.service.ssl.enabled

Yes

true

SSL flag for BLOB service client/server communication. This is applicable only when the global ssl flag security.ssl is enabled.

blob.fetch.retries

No

50

Number of retries for the TaskManager to download BLOB files from the JobManager

blob.fetch.num-concurrent

No

50

Maximum number of concurrent BLOB fetches that the JobManager serves

blob.fetch.backlog

No

1000

Maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows

Configuring the Distributed Coordination (via Akka)

The Akka actor model is the basis of communications between a Flink client and JobManager, JobManager and TaskManager, as well as TaskManagers. Flink enables you to configure the Akka connection parameters in the flink-conf.yaml file based on the network environment or optimization policy.

The configuration items include the timeout settings of frame sending and waiting, and the configuration of the Akka listening mechanism Deathwatch.

Table 3 Parameter description

Parameter

Mandatory

Default Value

Description

akka.ask.timeout

No

10s

Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The unit is ms/s/m/h/d.

akka.lookup.timeout

No

10s

Timeout used for the lookup of the JobManager actor object. The unit is ms/s/m/h/d.

akka.framesize

No

10485760b

Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The unit is b/B/KB/MB.

akka.watch.heartbeat.interval

No

10s

Heartbeat interval for Akka's DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value. The unit is ms/s/m/h/d.

NOTE:

A thorough description of Akka's DeathWatch can be found at http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.watch.heartbeat.pause

No

60s

Acceptable heartbeat pause for Akka's DeathWatch mechanism. A low value does not allow an irregular heartbeat. The unit is ms/s/m/h/d.

NOTE:

A thorough description of Akka's DeathWatch can be found at http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.watch.threshold

No

12

Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager.

NOTE:

A thorough description of Akka's DeathWatch can be found at http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector.

akka.tcp.timeout

No

20s

Timeout for all outbound TCP connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value. The unit is ms/s/m/h/d.

akka.throughput

No

15

Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling for actor message processing whereas high values can increase the performance at the cost of unfairness.

akka.log.lifecycle.events

No

false

Turns on the Akka's remote logging of events. Set this value to true in case of debugging.

akka.startup-timeout

No

The default value is the same as the value of akka.ask.timeout.

Timeout after which the startup of a remote component is considered being failed. The unit is ms/s/m/h/d.

akka.ssl.enabled

Yes

true

Turns on SSL for Akka's remote communication. This is applicable only when the global ssl flag security.ssl is enabled.

Configuring Secure Sockets Layer (SSL)

When you need to configure a Flink cluster in security mode, you need to configure the SSL configuration items.

The configuration items include the SSL switch, certificate, password, and encryption algorithm.

Table 4 Parameter description

Parameter

Mandatory

Default Value

Description

security.ssl.internal.enabled

Yes

The value is automatically configured based on the cluster installation mode.

  • Security mode: The default value is true.
  • Normal mode: The default value is false.

Turns on SSL for internal network communication

security.ssl.internal.keystore

Yes

-

Java keystore file

security.ssl.internal.keystore-password

Yes

-

Password used to decrypt the keystore file

security.ssl.internal.key-password

Yes

-

Password used to decrypt the server key in the keystore file

security.ssl.internal.truststore

Yes

-

truststore file containing the public CA certificates

security.ssl.internal.truststore-password

Yes

-

Password used to decrypt the truststore file

security.ssl.protocol

Yes

TLSv1.2

SSL protocol version to be supported for the ssl transport

security.ssl.algorithms

Yes

The default value is TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256.

Comma-separated list of standard SSL algorithms to be supported. For more information, visit the Java official website at http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites.

security.ssl.rest.enabled

Yes

The value is automatically configured based on the cluster installation mode.

  • Security mode: The default value is true.
  • Normal mode: The default value is false.

Turns on SSL for external network communication

security.ssl.rest.keystore

Yes

-

Java keystore file

security.ssl.rest.keystore-password

Yes

-

Password used to decrypt the keystore file

security.ssl.rest.key-password

Yes

-

Password used to decrypt the server key in the keystore file

security.ssl.rest.truststore

Yes

-

truststore file containing the public CA certificates

security.ssl.rest.truststore-password

Yes

-

Password used to decrypt the truststore file

Configuring Network Communication (via Netty)

When Flink runs a job, data transmission and backpressure detection between tasks depend on the Netty. In some environments, the Netty parameters may need to be configured.

For advanced optimization, you can modify the following Netty configuration items. The default configuration can meet the requirements of tasks of large-scale clusters with high concurrent throughput. For details about the parameters, visit the Netty official website at http://netty.io/.

Table 5 Parameter description

Parameter

Mandatory

Default Value

Description

taskmanager.network.netty.num-arenas

No

taskmanager.numberOfTaskSlots

Number of Netty arenas

taskmanager.network.netty.server.numThreads

No

taskmanager.numberOfTaskSlots

Number of Netty server threads

taskmanager.network.netty.client.numThreads

No

taskmanager.numberOfTaskSlots

Number of Netty client threads

taskmanager.network.netty.client.connectTimeoutSec

No

120

Netty client connection timeout. The unit is s.

taskmanager.network.netty.sendReceiveBufferSize

No

4096

Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MB in modern Linux. The unit is bytes.

taskmanager.network.netty.transport

No

nio

Netty transport type, either nio or epoll

Configuring the JobManager Web Frontend

When the JobManager is started, the Web server will be started in the same process.

  • You can access the web server to obtain information about the current Flink cluster, including JobManager, TaskManager, and jobs running in the cluster.
  • You can set the parameters of the web server.

The configuration includes ports, temporary directories, display items, error redirection, and security.

Table 6 Parameter description

Parameter

Mandatory

Default Value

Description

jobmanager.web.port

No

32261-32325

Web port. Value range: 32261-32325

jobmanager.web.allow-access-address

Yes

*

Web access whitelist. IP addresses are separated by commas (,). Only IP addresses in the whitelist can access the web.

Configuring the File Systems

Result files are created when tasks are running. Flink enables you to configure parameters for file creation.

The configuration items include file overwriting policies and directory creation.

Table 7 Parameter description

Parameter

Mandatory

Default Value

Description

fs.overwrite-files

No

false

Whether to overwrite existing files

fs.output.always-create-directory

No

false

File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory.

  • If this option is set to true, writers with a parallelism of 1 will also create a directory and place a single result file into it.
  • If the option is set to false, writers with a parallelism of 1 will directly create the file directly at the output path, without creating a containing directory.

Configuring the State Backend

Flink provides HA and job recovery upon exceptions, as well as job suspension and recovery during version upgrade. For the storage of the job status, Flink depends on the state backend, and job restart depends on restart policies. You can configure the two parts.

The configuration items include the state backend type, storage path, and restart policy.

Table 8 Parameter description

Parameter

Mandatory

Default Value

Description

state.checkpoints.dir

No

hdfs:///flink/checkpoints

Path when the backend is set to filesystem. The path must be accessible from JobManagers. A local path supports only the local mode. In cluster mode, use an HDFS path.

state.savepoints.dir

Mandatory in security mode

hdfs:///flink/savepoint

Path for storing savepoints.

If you have not specified an HDFS path when storing savepoints, you can use the parameter to configure a path.

restart-strategy

No

fixed-delay

Restart policy. The options are as follows:

  • fixed-delay
  • failure-rate
  • none

restart-strategy.fixed-delay.attempts

No

  • If the checkpoint is enabled in a job, the default value is Integer.MAX_VALUE.
  • If the checkpoint is not enabled in the job, the default value is 3.

Number of retries of the fixed-delay policy. For details about the policy, see https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html.

restart-strategy.fixed-delay.delay

No

  • If the checkpoint is enabled in the job, the default value is 10s.
  • If the checkpoint is not enabled in a job, the default value is the same as the value of akka.ask.timeout.

Interval of fixed-delay policy retries. The unit is ms/s/m/h/d.

restart-strategy.failure-rate.max-failures-per-interval

No

1

Number of retries of the failure-rate policy. For details about the policy, see https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html.

restart-strategy.failure-rate.failure-rate-interval

No

60s

Retry time of the failure-rate policy. The unit is ms/s/m/h/d.

restart-strategy.failure-rate.delay

No

The default value is the same as the value of akka.ask.timeout. For details, see Configuring the Distributed Coordination (via Akka).

Interval of failure-rate policy retries. The unit is ms/s/m/h/d.

Configuring the Kerberos-based Security

Flink Kerberos configuration items must be configured in security mode.

The configuration items include the keytab and principal of Kerberos.

Table 9 Parameter description

Parameter

Mandatory

Default Value

Description

security.kerberos.login.keytab

Yes

Configure the parameter based on actual service requirements.

Keytab file path. This parameter is a client parameter.

security.kerberos.login.principal

No

Configure the parameter based on actual service requirements.

This parameter is a client parameter. If both the keytab and principal are set, keytab authentication is used by default.

security.kerberos.login.contexts

Yes

Client, KafkaClient

Contexts of the jass file generated by Flink. This parameter is a server parameter.

Configure the HA

The HA mode of Flink depends on ZooKeeper. Therefore, ZooKeeper must be configured.

The configuration items include the ZooKeeper address, path, and security authentication.

Table 10 Parameter description

Parameter

Mandatory

Default Value

Description

high-availability

No

zookeeper

Whether to enable HA. Two modes are supported:

  • none: Only a single jobManager is running. The checkpoint is disabled for the jobManager.
  • ZooKeeper
    • In non-YARN mode, multiple jobManagers are supported and a leader is elected.
    • In YARN mode, only one jobManager exists.

high-availability.zookeeper.quorum

No

Automatic configuration

ZooKeeper quorum address

high-availability.zookeeper.path.root

No

/flink

Root directory created by Flink on ZooKeeper for storing metadata required by the HA mode

high-availability.storageDir

No

hdfs:///flink/recovery

Directory for storing JobManager metadata in the state backend. ZooKeeper stores only the pointers of the actual data.

high-availability.zookeeper.client.session-timeout

No

60000

ZooKeeper client session timeout interval The unit is ms.

high-availability.zookeeper.client.connection-timeout

No

15000

ZooKeeper client connection timeout interval The unit is ms.

high-availability.zookeeper.client.retry-wait

No

5000

ZooKeeper client retry wait time. The unit is ms.

high-availability.zookeeper.client.max-retry-attempts

No

3

Maximum number of ZooKeeper client retries

high-availability.zookeeper.client.acl

Yes

The value is automatically configured based on the cluster installation mode.

  • Security mode: creator
  • Normal mode: open

ACL (open creator) of the ZooKeeper node

zookeeper.sasl.disable

Yes

The value is automatically configured based on the cluster installation mode.

  • Security mode: false
  • Normal mode: true

Whether to enable SASL authentication

zookeeper.sasl.service-name

Yes

zookeeper

  • If a service name different from the ZooKeeper service name is configured on the ZooKeeper server, you can set this parameter.
  • If the service names on the client and server are different, the authentication fails.

Configuring the Environment

If the JVM configuration has special requirements, the JVM parameters can be transferred to the client, JobMananger, and TaskManager using configuration items.

You can configure the following JVM parameters

Table 11 Parameter description

Parameter

Mandatory

Default Value

Description

env.java.opts

No

-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M

JVM parameter, which is transferred to the startup script, JobManager, TaskManager, and YARN client. For example, transfer remote debugging parameters.

Configuring YARN

When Flink runs on a YARN cluster, the JobManager runs on the ApplicationMaster. Some configuration parameters of the JobManager depend on YARN. Therefore, you can configure YARN to improve Flink performance on YARN.

The configuration items include the memory, virtual kernel, and port of the YARN container.

Table 12 Parameter description

Parameter

Mandatory

Default Value

Description

yarn.maximum-failed-containers

No

5

Maximum number of containers the system is going to reallocate in case of a container failure of the TaskManager. The default value is the number of TaskManagers when the Flink cluster is started.

yarn.application-attempts

No

2

Number of ApplicationMaster restarts. The value is the maximum value in the validity interval that is set to Akka's timeout in Flink. After the restart, the IP address and port number of the ApplicationMaster will change and you will need to connect to the client manually.

yarn.heartbeat-delay

No

5

Time between heartbeats with the ApplicationMaster and YARN ResourceManager in seconds

yarn.containers.vcores

No

The default value is the number of TaskManager slots.

Number of virtual cores (vcores) per YARN container

yarn.application-master.port

No

32586-32650

ApplicationMaster port number setting. A port number range is supported.

Did you find this page helpful?

Submit successfully!

Thank you for your feedback. Your feedback helps make our documentation better.

Failed to submit the feedback. Please try again later.

Which of the following issues have you encountered?







Please complete at least one feedback item.

Content most length 200 character

Content is empty.

OK Cancel