Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Performance Tuning Suggestions

Updated on 2024-09-10 GMT+08:00

Enable the Log Indexes for Hudi MOR Stream Tables for Faster Flink Streaming Reads on the MOR Table

To enable log indexes for better read and write performance of Hudi MOR tables, add 'hoodie.log.index.enabled'='true' for the Sink and Source tables.

Adjust Operator Parallelism to Improve Performance

  • You can set the parallelism parameters of read and write operators to improve Hudi read and write performance.

    The read.tasks parameter is for the parallelism of the read operator.

    The write.tasks parameter is for the parallelism of the write operator.

  • When state indexes are used and the job is restarted (not checkpoint restart), the target table needs to be read to rebuild the indexes. You can increase the parallelism of the operator to improve the performance.

    The write.index_bootstrap.tasks parameter controls the parallelism for loading indexes.

  • When state indexes are used to write data, check the uniqueness of the primary key and allocate a specific file to be written to improve operator parallelism for better performance.

    The write.bucket_assign.tasks parameter controls the task parallelism for bucket assign. The default value is the parallelism of the execution environment.

Optimize Resources to Improve the Performance of Stateless Computing

Flink computing operations are classified into the following types:

  • Stateless computing: These operators (such as filter, union all, and lookup join) do not need to save computing states.
  • Stateful computing: These operators (such as join, union, window, group by, and aggregation operators) compute based on data state changes.

For non-stateful computing, you can adjust Heap Size and NetWork of TaskManager to optimize performance.

For example, if a job only reads and writes data, TaskManager does not need extra vCores. The default values of off-Heap and Overhead are 1 GB, and the memory is mainly allocated to heap and network.

Optimize Resources to Improve the Performance of Stateful Computing

The SQL logic contains many operations such as join and convolution calculation. Tune state backend performance, vCore, and Manage Memory.

For example, if a job joins over three tables and the performance requirement is high, add six extra vCores to a single TaskManager, increase the off-Heap and overhead to 5 GB, and set the Manage Memory used for Flink status management to 9.6 GB.

Optimize State Backends Through Table-Level TTL

This suggestion is available for MRS 3.3.0 or later.

When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for left and right tables. The WHERE clause is not supported.

For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.

  • Use Hints in the following format:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • The following is a configuration example with a SQL statement:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      LEFT JOIN 
      -- Set different TTLs for left and right tables.
      /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
      user_score as d ON t.user_id = d.user_id;

Optimize the State Backend Through Table-level JTL

This suggestion is available for MRS 3.3.0 or later.

If backend data deletion upon one join is allowed in a Flink dual-stream inner join, this feature can be used.

This feature is available for inner joins of streams only.

You can use hints to set different join times for left and right tables.

  • Use Hints in the following format:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • The following is a configuration example with a SQL statement:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      -- Set different JTL join times for left and right tables.
      /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */
      user_score as d ON t.user_id = d.user_id;

Number of TM Slots Should be a Multiple of the Number of TM CPUs

In Flink, each task is divided into subtasks. A subtask is an execution thread unit that runs on the TM. If Slot Sharing Group is disabled, a subtask is deployed in a slot. Even if Slot Sharing Group is enabled, the subtasks in a slot are load balanced in most cases. The number of slots on the TM indicates the number of running task threads.

The number of slots must be the same as the number of CPU cores. When hyper-threading is used, each slot occupies two or more hardware threads.

[Example] Set the number of TM slots to 2 to 4 times the number of CPU cores.

taskmanager.numberOfTaskSlots: 4
taskmanager.cpu.cores: 2

Adjust Network Memory When Shuffle Is Enabled, Data Volume Is Large, and Concurrency Is High

When there are a large number of concurrent requests and a large amount of data, there are massive amounts of network I/Os after shuffle. Increasing the network cache memory can increase the amount of data read at a time, thereby improving the I/O speed.

[Example]

#Ratio of the network memory usage to the process memory usage
taskmanager.memory.network.fraction: 0.6
# Minimum size of the network cache memory
taskmanager.memory.network.min: 1g
#Maximum size of the network cache memory. (In MRS 3.3.1 and later versions, you do not need to change the value. The default value is Long#MAX_VALUE.)
taskmanager.memory.network.max: 20g

Use Simple Data Types Such as POJO and Avro Based on Serialization Performance

When using APIs to code Flink programs, you should consider the serialization of Java objects. In most cases, Flink can efficiently process serialization. SQL data is ROW data. SQL uses the built-in efficient serializer of Flink.

Table 1 Serialization

Serializer

Opts/s

PojoSeriallizer

813

Kryo

294

Avro(Reflect API)

114

Avro(SpecificRecord API)

632

Network Communication Optimization

Flink communication mainly depends on the Netty network. Netty settings are especially important for Flink application execution. The network determines the data exchange speed and task execution efficiency.

[Example]

# Number of threads on the netty server. The value -1 indicates the default parameter numOfSlot.
taskmanager.network.netty.server.numThreads -1(numOfSlot)
# Number of netty client threads (The value -1 indicates the default parameter numofSlot).
taskmanager.network.netty.client.numThreads : -1
# Timeout interval for connecting to the netty client.
taskmanager.network.netty.client.connectTimeoutSec: 120s
# Size of the sending and receiving buffers of netty (0 indicates the default parameter of netty, 4 MB)
taskmanager.network.netty.sendReceiveBufferSize: 0
# Netty transmission mode. The default option selects the mode based on the platform.
taskmanager.network.netty.transport: auto

Overall Memory Optimization

Flink has the heap memory and off-heap memory. The Java heap memory is specified when the Java program is created, which is also part of the memory where the JVM automatically triggers GC. Off-heap memory can be classified into managed memory and memory cannot be managed by the JVM. Managed Memory and Direct Memory that can be managed by the JVM are the focus of optimization. JVM Metaspace and JVM Overhead that cannot be managed by the JVM are native memory.

Figure 1 Memory
Table 2 Related parameters

Parameter

Configuration

Description

Remarks

Total Memory

taskmanager.memory.flink.size: none

Total memory size managed by Flink. There is no default value. Metaspace and Overhead are not included. Set this parameter in standalone mode.

Overall memory

taskmanager.memory.process.size: none

Memory size used by the entire Flink process. Set this parameter when containers are used.

FrameWork

taskmanager.memory.framework.heap.size: 128mb

Size of the heap memory occupied by runtime. Generally, you do not need to change the value. The occupied space is relatively fixed.

Memory occupied by RUNTIME. Generally, you do not need to change the value greatly.

taskmanager.memory.framework.off-heap.size: 128mb

Size of the off-heap memory occupied by runtime. Generally, you do not need to change the value. The occupied space is relatively fixed.

Task

taskmanager.memory.task.heap.size: none

There is no default value. The value is obtained by subtracting the memory for framework, hosting, and network from flink.size.

Operator logic in regular objects of user code (such as UDFs), which occupies memory

taskmanager.memory.task.off-heap.size: 0

The default value is 0, indicating the off-heap memory used by task

Managed Memory

taskmanager.memory.managed.fraction: 0.4

Ratio of managed memory to taskmanager.memory.flink.size. The default value is 0.4.

The managed memory used for intermediate result caching, sorting, hashing (batch calculation) and by RocksDB state backends (stream computing). For batch processing, a fixed size of memory is applied for at the beginning. For stream processing, the memory is applied on demand.

taskmanager.memory.managed.size: 0

Size of managed memory. Generally, this parameter is not specified. The default value is 0. The size is calculated based on taskmanager.memory.managed.fraction. If this parameter is specified, the memory ratio will be overwritten.

Network

taskmanager.memory.network.min: 64mb

Minimum network memory.

Network memory for shuffle and broadcast between TaskManagers, and for network buffer.

taskmanager.memory.network.max: 1gb

Maximum size of the network cache. (For MRS 3.3.1 and later versions, you do not need to change the value. The default value is Long#MAX_VALUE.)

taskmanager.memory.network.fraction: 0.1

Fraction of taskmanager.memory.flink.size used as the network memory. The default value is 0.1, which is limited to the value between network.min and network.max.

Network memory for shuffle and broadcast between TaskManagers, and for network buffer.

Others

taskmanager.memory.jvm-metaspace.size: 256M

Maximum size of the metaspace. The default value is 256 MB.

Memory managed by users

taskmanager.memory.jvm-overhead.min: 192M

Minimum extra overhead of the JVM. The default value is 192 MB.

taskmanager.memory.jvm-overhead.max: 1G

Maximum extra overhead of JVM. The default value is 1 GB.

taskmanager.memory.jvm-overhead.fraction: 0.1

Ratio of the extra JVM overhead to taskmanager.memory.process.size. The default value is 0.1. The calculated extra JVM overhead is limited between jvm-overhead.min and jvm-overhead.max.

NOTE:

In MRS 3.3.1 and later versions, you do not need to change the value of taskmanager.memory.network.max.

Reduce Shuffled Data As Much As Possible If Broadcast Join Cannot Be Used

If broadcast join is not supported, shuffling will occur. You can use various methods, such as predicate pushdown and runtime filter, to reduce the amount of shuffled data.

[Example]

# Configure runtime filter
table.exec.runtime-filter.enabled: true
# Pushdown
table.optimizer.source.predicate-pushdown-enabled: true

Use a Local-Global Optimization Policy When Data Skew Occurs

[Example]

# Enable mini-batch optimization.
table.exec.mini-batch.enabled: true
#Maximum waiting time
table.exec.mini-batch.allow-latency: 20ms
#Maximum number of cached records
table.exec.mini-batch.size: 8000
# Enable two-phase aggregation.
table.optimizer.agg-phase-strategy: TWO_PHASE

Use MiniBatch Aggregation to Increase Throughput

The core idea of MiniBatch aggregation is caching a group of input data in the buffer of the aggregation operator. When the input data is triggered for processing, each key can access states with only one operation, which greatly reduces state overhead and achieves better throughput. However, latency may increase because it buffers some records instead of processing them immediately, which is a trade-off between throughput and latency. This function is disabled by default.

  • Configure with APIs:
    // Instantiate table environmentTableEnvironment tEnv = ...
    // Access flink configuration.
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
  • Configure in the resource file (flink-conf.yaml):
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000

Use Local-Global Two-Phase Aggregation to Reduce Data Skew

Local-Global aggregation is proposed to solve the data skew problem. A group of aggregations is divided into two phases: local aggregation in the upstream and global aggregation in the downstream, which is similar to the Combine + Reduce in MapReduce.

Records in a data stream may skew. Instances of some aggregation operators must process more records than others, which can cause hotspotting. Local aggregation can accumulate a certain amount of input data with the same key to a single accumulator. Global aggregation receives only the reduced accumulator instead of a large amount of original input data, which greatly reduces network shuffle and state access. The amount of input data accumulated in each local aggregation is based on the mini-batch interval, which means that local-global aggregation depends on mini-batch optimization.

  • Configure with APIs:
    // Instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
  • Configure in the resource file:
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000
    table.optimizer.agg-phase-strategy: TWO_PHASE

Use Multiple Disks to Improve I/O Performance When RocksDB Is the State Backend

RocksDB uses memory and disks to store data. When state is large, disk space usage is high. If there are frequent read requests to RocksDB, the disk I/O will limit speed of Flink tasks. When a TaskManager contains three slots, disks of a single server are frequently read and written. Concurrent operations contend for the I/O of the same disk, and the throughput of the three slots decreases. You can specify multiple disks to reduce I/O competition.

[Example] Configure checkpoint directories of RockDB on different disks (flink-conf.yaml).

state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb

Replace the ValueState Storage Containers with MapState or ListState when RocksDB is the Status Backend

RocksDB is an embedded KV database. Data is stored in key-value pairs. For map data, if ValueState is used, the data is stored as a record in RocksDB, and the value is the entire map. If MapState is used, the data is stored in multiple records in RocksDB. This allows only a small part of data be serialized during query or modification. When the map is stored as a whole, adding, deleting, or modifying the map causes a large number of serialization operations. For List data, ListState can be used to dynamically add elements without serialization.

In addition, the state in Flink supports TTL. TTL encapsulates the timestamp and userValue. The TTL of ValueState is based on the entire key. The TTL of MapState<UK, UV> is based on the UK. It has a smaller granularity and supports more TTL semantics.

Configure Compaction to Reduce the Checkpoint Size

In I/O-intensive applications, you can enable checkpoint compaction to improve I/O performance at the cost of a little CPU performance.

[Example] Enable compaction in checkpoint configuration (flink-conf.yaml).

execution.checkpointing.snapshot-compression: true

Recover Large-State Checkpoint from Local States

To quickly recover, each task writes checkpoint data to the local disk and distributed remote storage at the same time. Each data record has two replications. When an application needs to recover, the system checks if the local checkpoint data is okay. If it is, the system uses it first. This makes it faster to get the state data without having to get it from a remote location.

[Example] Configure checkpoints to be preferentially restored from the local host (flink-conf.yaml):

state.backend.local-recovery: true

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback