Updated on 2024-09-10 GMT+08:00

Performance Tuning Suggestions

Enable the Log Indexes for Hudi MOR Stream Tables for Faster Flink Streaming Reads on the MOR Table

To enable log indexes for better read and write performance of Hudi MOR tables, add 'hoodie.log.index.enabled'='true' for the Sink and Source tables.

Adjust Operator Parallelism to Improve Performance

  • You can set the parallelism parameters of read and write operators to improve Hudi read and write performance.

    The read.tasks parameter is for the parallelism of the read operator.

    The write.tasks parameter is for the parallelism of the write operator.

  • When state indexes are used and the job is restarted (not checkpoint restart), the target table needs to be read to rebuild the indexes. You can increase the parallelism of the operator to improve the performance.

    The write.index_bootstrap.tasks parameter controls the parallelism for loading indexes.

  • When state indexes are used to write data, check the uniqueness of the primary key and allocate a specific file to be written to improve operator parallelism for better performance.

    The write.bucket_assign.tasks parameter controls the task parallelism for bucket assign. The default value is the parallelism of the execution environment.

Optimize Resources to Improve the Performance of Stateless Computing

Flink computing operations are classified into the following types:

  • Stateless computing: These operators (such as filter, union all, and lookup join) do not need to save computing states.
  • Stateful computing: These operators (such as join, union, window, group by, and aggregation operators) compute based on data state changes.

For non-stateful computing, you can adjust Heap Size and NetWork of TaskManager to optimize performance.

For example, if a job only reads and writes data, TaskManager does not need extra vCores. The default values of off-Heap and Overhead are 1 GB, and the memory is mainly allocated to heap and network.

Optimize Resources to Improve the Performance of Stateful Computing

The SQL logic contains many operations such as join and convolution calculation. Tune state backend performance, vCore, and Manage Memory.

For example, if a job joins over three tables and the performance requirement is high, add six extra vCores to a single TaskManager, increase the off-Heap and overhead to 5 GB, and set the Manage Memory used for Flink status management to 9.6 GB.

Optimize State Backends Through Table-Level TTL

This suggestion is available for MRS 3.3.0 or later.

When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for left and right tables. The WHERE clause is not supported.

For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.

  • Use Hints in the following format:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • The following is a configuration example with a SQL statement:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      LEFT JOIN 
      -- Set different TTLs for left and right tables.
      /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
      user_score as d ON t.user_id = d.user_id;

Optimize the State Backend Through Table-level JTL

This suggestion is available for MRS 3.3.0 or later.

If backend data deletion upon one join is allowed in a Flink dual-stream inner join, this feature can be used.

This feature is available for inner joins of streams only.

You can use hints to set different join times for left and right tables.

  • Use Hints in the following format:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • The following is a configuration example with a SQL statement:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      -- Set different JTL join times for left and right tables.
      /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */
      user_score as d ON t.user_id = d.user_id;

Number of TM Slots Should be a Multiple of the Number of TM CPUs

In Flink, each task is divided into subtasks. A subtask is an execution thread unit that runs on the TM. If Slot Sharing Group is disabled, a subtask is deployed in a slot. Even if Slot Sharing Group is enabled, the subtasks in a slot are load balanced in most cases. The number of slots on the TM indicates the number of running task threads.

The number of slots must be the same as the number of CPU cores. When hyper-threading is used, each slot occupies two or more hardware threads.

[Example] Set the number of TM slots to 2 to 4 times the number of CPU cores.

taskmanager.numberOfTaskSlots: 4
taskmanager.cpu.cores: 2

Adjust Network Memory When Shuffle Is Enabled, Data Volume Is Large, and Concurrency Is High

When there are a large number of concurrent requests and a large amount of data, there are massive amounts of network I/Os after shuffle. Increasing the network cache memory can increase the amount of data read at a time, thereby improving the I/O speed.

[Example]

#Ratio of the network memory usage to the process memory usage
taskmanager.memory.network.fraction: 0.6
# Minimum size of the network cache memory
taskmanager.memory.network.min: 1g
#Maximum size of the network cache memory. (In MRS 3.3.1 and later versions, you do not need to change the value. The default value is Long#MAX_VALUE.)
taskmanager.memory.network.max: 20g

Use Simple Data Types Such as POJO and Avro Based on Serialization Performance

When using APIs to code Flink programs, you should consider the serialization of Java objects. In most cases, Flink can efficiently process serialization. SQL data is ROW data. SQL uses the built-in efficient serializer of Flink.

Table 1 Serialization

Serializer

Opts/s

PojoSeriallizer

813

Kryo

294

Avro(Reflect API)

114

Avro(SpecificRecord API)

632

Network Communication Optimization

Flink communication mainly depends on the Netty network. Netty settings are especially important for Flink application execution. The network determines the data exchange speed and task execution efficiency.

[Example]

# Number of threads on the netty server. The value -1 indicates the default parameter numOfSlot.
taskmanager.network.netty.server.numThreads -1(numOfSlot)
# Number of netty client threads (The value -1 indicates the default parameter numofSlot).
taskmanager.network.netty.client.numThreads : -1
# Timeout interval for connecting to the netty client.
taskmanager.network.netty.client.connectTimeoutSec: 120s
# Size of the sending and receiving buffers of netty (0 indicates the default parameter of netty, 4 MB)
taskmanager.network.netty.sendReceiveBufferSize: 0
# Netty transmission mode. The default option selects the mode based on the platform.
taskmanager.network.netty.transport: auto

Overall Memory Optimization

Flink has the heap memory and off-heap memory. The Java heap memory is specified when the Java program is created, which is also part of the memory where the JVM automatically triggers GC. Off-heap memory can be classified into managed memory and memory cannot be managed by the JVM. Managed Memory and Direct Memory that can be managed by the JVM are the focus of optimization. JVM Metaspace and JVM Overhead that cannot be managed by the JVM are native memory.

Figure 1 Memory
Table 2 Related parameters

Parameter

Configuration

Description

Remarks

Total Memory

taskmanager.memory.flink.size: none

Total memory size managed by Flink. There is no default value. Metaspace and Overhead are not included. Set this parameter in standalone mode.

Overall memory

taskmanager.memory.process.size: none

Memory size used by the entire Flink process. Set this parameter when containers are used.

FrameWork

taskmanager.memory.framework.heap.size: 128mb

Size of the heap memory occupied by runtime. Generally, you do not need to change the value. The occupied space is relatively fixed.

Memory occupied by RUNTIME. Generally, you do not need to change the value greatly.

taskmanager.memory.framework.off-heap.size: 128mb

Size of the off-heap memory occupied by runtime. Generally, you do not need to change the value. The occupied space is relatively fixed.

Task

taskmanager.memory.task.heap.size: none

There is no default value. The value is obtained by subtracting the memory for framework, hosting, and network from flink.size.

Operator logic in regular objects of user code (such as UDFs), which occupies memory

taskmanager.memory.task.off-heap.size: 0

The default value is 0, indicating the off-heap memory used by task

Managed Memory

taskmanager.memory.managed.fraction: 0.4

Ratio of managed memory to taskmanager.memory.flink.size. The default value is 0.4.

The managed memory used for intermediate result caching, sorting, hashing (batch calculation) and by RocksDB state backends (stream computing). For batch processing, a fixed size of memory is applied for at the beginning. For stream processing, the memory is applied on demand.

taskmanager.memory.managed.size: 0

Size of managed memory. Generally, this parameter is not specified. The default value is 0. The size is calculated based on taskmanager.memory.managed.fraction. If this parameter is specified, the memory ratio will be overwritten.

Network

taskmanager.memory.network.min: 64mb

Minimum network memory.

Network memory for shuffle and broadcast between TaskManagers, and for network buffer.

taskmanager.memory.network.max: 1gb

Maximum size of the network cache. (For MRS 3.3.1 and later versions, you do not need to change the value. The default value is Long#MAX_VALUE.)

taskmanager.memory.network.fraction: 0.1

Fraction of taskmanager.memory.flink.size used as the network memory. The default value is 0.1, which is limited to the value between network.min and network.max.

Network memory for shuffle and broadcast between TaskManagers, and for network buffer.

Others

taskmanager.memory.jvm-metaspace.size: 256M

Maximum size of the metaspace. The default value is 256 MB.

Memory managed by users

taskmanager.memory.jvm-overhead.min: 192M

Minimum extra overhead of the JVM. The default value is 192 MB.

taskmanager.memory.jvm-overhead.max: 1G

Maximum extra overhead of JVM. The default value is 1 GB.

taskmanager.memory.jvm-overhead.fraction: 0.1

Ratio of the extra JVM overhead to taskmanager.memory.process.size. The default value is 0.1. The calculated extra JVM overhead is limited between jvm-overhead.min and jvm-overhead.max.

In MRS 3.3.1 and later versions, you do not need to change the value of taskmanager.memory.network.max.

Reduce Shuffled Data As Much As Possible If Broadcast Join Cannot Be Used

If broadcast join is not supported, shuffling will occur. You can use various methods, such as predicate pushdown and runtime filter, to reduce the amount of shuffled data.

[Example]

# Configure runtime filter
table.exec.runtime-filter.enabled: true
# Pushdown
table.optimizer.source.predicate-pushdown-enabled: true

Use a Local-Global Optimization Policy When Data Skew Occurs

[Example]

# Enable mini-batch optimization.
table.exec.mini-batch.enabled: true
#Maximum waiting time
table.exec.mini-batch.allow-latency: 20ms
#Maximum number of cached records
table.exec.mini-batch.size: 8000
# Enable two-phase aggregation.
table.optimizer.agg-phase-strategy: TWO_PHASE

Use MiniBatch Aggregation to Increase Throughput

The core idea of MiniBatch aggregation is caching a group of input data in the buffer of the aggregation operator. When the input data is triggered for processing, each key can access states with only one operation, which greatly reduces state overhead and achieves better throughput. However, latency may increase because it buffers some records instead of processing them immediately, which is a trade-off between throughput and latency. This function is disabled by default.

  • Configure with APIs:
    // Instantiate table environmentTableEnvironment tEnv = ...
    // Access flink configuration.
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
  • Configure in the resource file (flink-conf.yaml):
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000

Use Local-Global Two-Phase Aggregation to Reduce Data Skew

Local-Global aggregation is proposed to solve the data skew problem. A group of aggregations is divided into two phases: local aggregation in the upstream and global aggregation in the downstream, which is similar to the Combine + Reduce in MapReduce.

Records in a data stream may skew. Instances of some aggregation operators must process more records than others, which can cause hotspotting. Local aggregation can accumulate a certain amount of input data with the same key to a single accumulator. Global aggregation receives only the reduced accumulator instead of a large amount of original input data, which greatly reduces network shuffle and state access. The amount of input data accumulated in each local aggregation is based on the mini-batch interval, which means that local-global aggregation depends on mini-batch optimization.

  • Configure with APIs:
    // Instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
  • Configure in the resource file:
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000
    table.optimizer.agg-phase-strategy: TWO_PHASE

Use Multiple Disks to Improve I/O Performance When RocksDB Is the State Backend

RocksDB uses memory and disks to store data. When state is large, disk space usage is high. If there are frequent read requests to RocksDB, the disk I/O will limit speed of Flink tasks. When a TaskManager contains three slots, disks of a single server are frequently read and written. Concurrent operations contend for the I/O of the same disk, and the throughput of the three slots decreases. You can specify multiple disks to reduce I/O competition.

[Example] Configure checkpoint directories of RockDB on different disks (flink-conf.yaml).

state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb

Replace the ValueState Storage Containers with MapState or ListState when RocksDB is the Status Backend

RocksDB is an embedded KV database. Data is stored in key-value pairs. For map data, if ValueState is used, the data is stored as a record in RocksDB, and the value is the entire map. If MapState is used, the data is stored in multiple records in RocksDB. This allows only a small part of data be serialized during query or modification. When the map is stored as a whole, adding, deleting, or modifying the map causes a large number of serialization operations. For List data, ListState can be used to dynamically add elements without serialization.

In addition, the state in Flink supports TTL. TTL encapsulates the timestamp and userValue. The TTL of ValueState is based on the entire key. The TTL of MapState<UK, UV> is based on the UK. It has a smaller granularity and supports more TTL semantics.

Configure Compaction to Reduce the Checkpoint Size

In I/O-intensive applications, you can enable checkpoint compaction to improve I/O performance at the cost of a little CPU performance.

[Example] Enable compaction in checkpoint configuration (flink-conf.yaml).

execution.checkpointing.snapshot-compression: true

Recover Large-State Checkpoint from Local States

To quickly recover, each task writes checkpoint data to the local disk and distributed remote storage at the same time. Each data record has two replications. When an application needs to recover, the system checks if the local checkpoint data is okay. If it is, the system uses it first. This makes it faster to get the state data without having to get it from a remote location.

[Example] Configure checkpoints to be preferentially restored from the local host (flink-conf.yaml):

state.backend.local-recovery: true