Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Flink Performance Tuning Suggestions

Updated on 2024-08-16 GMT+08:00

Memory Configuration

Flink depends on in-memory computing. If memory is insufficient during computing, the Flink execution efficiency will be adversely affected. You can determine whether memory becomes a performance bottleneck by monitoring garbage collection (GC) and evaluating memory usage, and take performance optimization measures.

If full GC frequently occurs in the YARN container GC logs of monitoring node processes, optimize GC.

NOTE:

GC configuration: Add the following parameters to the env.java.opts configuration item in the conf/flink-conf.yaml file on the client: "-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M". GC logs are added by default.

  • GC optimization

    To optimize GC, adjust the ratio of the old generation to the young generation. In the conf/flink-conf.yaml configuration file on the client, add the -XX:NewRatio parameter to the env.java.opts configuration item. For example, -XX:NewRatio=2 indicates that the ratio of the old generation to the young generation 2:1, the new generation occupies 1/3 of the entire heap space, and the old generation occupies 2/3.

  • When developing Flink applications, optimize the data partitioning or grouping of DataStream.
    • If partitioning causes data skew, partitioning needs to be optimized.
    • Avoid unparallel operations, because some operations on DataStream, for example, WindowAll, cause parallelism failure.
    • Do not use a string for keyBy.

Configuring DOP

The degree of parallelism (DOP) indicates the number of tasks to be executed concurrently. It determines the number of data blocks after splitting. Adjust the DOP to maximize the number of tasks, the volume of data processed in each task, and the data processing capabilities the machines.

Query CPU and memory usage. If data and tasks are not evenly distributed among nodes, increase the DOP. Increasing the DOP makes full use of computing capabilities of machines in the cluster.

You can specify and adjust the DOP at one of the following levels (the priorities of which are in descending order) based on the actual memory, CPU, data, and application logic conditions.

  • Operator

    Invoke the setParallelism() method to specify the DOP of an operator, data source, and data sink. Example:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new LineSplitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1).setParallelism(5);
    wordCounts.print();
    env.execute("Word Count Example");
  • Execution environment

    A Flink program runs in the execution environment. In the execution environment, a default DOP is defined for all executed operators, data sources, and data sinks.

    You can specify the default DOP by invoking the setParallelism() method. Example:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = [...]
    wordCounts.print();
    env.execute("Word Count Example");
  • Client

    You can specify the DOP when submitting jobs to Flink on the client. For a CLI client, you can specify the DOP using the -p parameter. Example:

    ./bin/flink run -p 10 ../examples/*WordCount-java*.jar
  • System

    At the system level, you can modify parallelism.default in the flink-conf.yaml file in the conf directory of the Flink client to specify the default DOP for all execution environments.

Configuring Process Parameters

In Flink on YARN mode, there are two processes: JobManager and TaskManager. JobManagers and TaskManagers shoulder major responsibilities during task scheduling and running.

Parameter configurations of JobManagers and TaskManagers significantly affect the execution performance of Flink applications. You can perform the following operations to optimize Flink cluster performance.

  1. Configure JobManager memory.

    JobManagers are responsible for task scheduling and message communications between the TaskManager and ResourceManager. JobManager memory needs to be added as tasks and the DOP increase.

    You can set proper JobManager memory based on the number of tasks.

    • When running the yarn-session command, add the -jm MEM parameter to configure memory.
    • When running the yarn-cluster command, add the -yjm MEM parameter to configure memory.

  2. Configure the number of TaskManagers.

    Each core of a TaskManager can process a task at the same time. Increasing the number of TaskManager has the same effect as increasing the DOP. Therefore, you can increase the number of TaskManagers to improve efficiency when there are sufficient resources.

    • When running the yarn-session command, add the -n NUM parameter to configure the number of TaskManagers.
    • When running the yarn-cluster command, add the -yn NUM parameter to configure the number of TaskManagers.

  3. Configure the number of TaskManager slots.

    Multiple cores of a TaskManager can process multiple tasks at the same time. This has the same effect as increasing the DOP. However, the number of cores and the memory must be balanced, because all cores share the memory of the TaskManager.

    • When running the yarn-session command, add the -s NUM parameter to configure the number of slots.
    • When running the yarn-cluster command, add the -ys NUM parameter to configure the number of slots.

  4. Configure TaskManager memory.

    The memory of a TaskManager is used for task execution and communications. A large-size task requires more resources. In this case, you can increase memory.

    • When running the yarn-session command, add the -tm MEM parameter to configure memory.
    • When running the yarn-cluster command, add the -ytm MEM parameter to configure memory.

Partitioning Design Methods

A proper partitioning design can optimize task splitting. Ensure even partitioning during programming to prevent data skew in tasks. Otherwise, long-time execution of a task will delay the whole task.

Partitioning methods are as follows:

  • Random partitioning: Partitions elements randomly.
    dataStream.shuffle();
  • Rebalancing (Round-robin partitioning): Partitions elements round-robin, creating equal load per partition. This is useful for performance optimization in the presence of data skew.
    dataStream.rebalance();
  • Rescaling: Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur.
    dataStream.rescale();
  • Broadcasting: Broadcasts elements to every partition.
    dataStream.broadcast();
  • Custom partitioning: Uses a user-defined Partitioner to select the target task for each element. Custom partitioning allows users to partition data based on a certain feature to optimize task execution.

    The following is an example:

    // Use fromElements to construct a simple Tuple2 flow.
    DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
    // Define a key value used for partitioning. The return value is the partition to which the key belongs. The value plus 1 is the ID of the corresponding subtask.
    Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
        @Override
        public int partition(Tuple2<String, Integer> key, int numPartitions) {
            return (key.f0.length() + key.f1) % numPartitions;
        }
    };
    // Indicates the key value for partitioning using Tuple2.
    dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
            return value;
        }
    }).print();

Configuring the Netty Network Communication

Flink communications depend on a Netty network. Netty network configuration is critical to Flink application execution, because the network performance determines data exchange speed and task execution efficiency.

The following parameters allow for advanced tuning in the conf/flink-conf.yaml configuration file on the client. The default values are sufficient. Exercise caution when changing the default values, preventing performance deterioration.

  • taskmanager.network.netty.num-arenas: Number of Netty arenas. The default value is the value of taskmanager.numberOfTaskSlots.
  • taskmanager.network.netty.server.numThreads and taskmanager.network.netty.client.numThreads: Number of Netty server and client threads, respectively. The default values are the value of taskmanager.numberOfTaskSlots.
  • taskmanager.network.netty.client.connectTimeoutSec: Netty client connection timeout. The default value is 120s.
  • taskmanager.network.netty.sendReceiveBufferSize: Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MB in modern Linux.
  • taskmanager.network.netty.transport: Netty transport type, either nio or epoll. The default value is nio.

Experience Summary

Avoiding Data Skew

If data skew occurs (certain data volume is extremely large), the execution time of tasks is inconsistent even though no GC is performed.

  • Redefine the keys. Use keys of smaller granularity to optimize task sizes.
  • Modify the DOP.
  • Call the rebalance operation to balance data partitions.

Setting Buffer Timeout

  • During the execution of tasks, data is exchanged through network. You can configure the setBufferTimeout parameter to specify a buffer timeout interval for data exchanging among different servers.
  • If setBufferTimeout is set to -1, the refreshing operation is performed when the buffer is full, maximizing the throughput. If setBufferTimeout is set to 0, the refreshing operation is performed each time data is received, minimizing the delay. If setBufferTimeout is set to a value greater than 0, the refreshing operation is performed after the butter times out.

    The following is an example.

    env.setBufferTimeout(timeoutMillis);
    env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback