计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

Flink性能调优建议

更新时间:2024-09-10 GMT+08:00

Hudi MOR流表开启log Index特性提升Flink流读Mor表性能

Hudi的Mor表可以通过log index提升读写性能,在Sink和Source表添加属性 'hoodie.log.index.enabled'='true'。

通过调整对应算子并行度提升性能

  • 读写Hudi可以通过配置读写并发提升读写性能。

    读算子的并行度调整参数:read.tasks

    写算子的并行度调整参数:write.tasks

  • 采用状态索引在作业重启的时候(非Checkpoint重启),需要读目标表重建索引,可以增大该算子并行度提升性能。

    加载索引的并行度调整参数:write.index_bootstrap.tasks

  • 采用状态索引写数据需要进行主键唯一性检查,分配具体写入文件,提升该算子并行度提升性能。

    写算子索引检测算子调整参数:write.bucket_assign.tasks

非状态计算提升性能的资源优化

Flink计算操作分为如下两类:

  • 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。
  • 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。

对于非状态计算主要调优为TaskManager的Heap Size与NetWork。

例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。

状态计算提升性能的资源优化

SQL逻辑包含较多join、卷积计算等操作。主要调优状态后端性能、vCore、Manage Memory。

例如作业做三表多表关联,性能要求高,单个TaskManage增加额外的6个vCore,off-Heap和Overhead提高到5GB,用于Flink状态管理的Manage Memory为9.6GB。

通过表级TTL进行状态后端优化

本章节适用于MRS 3.3.0及以后版本。

在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。

可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒:

  • Hint方式格式:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • 在SQL语句中配置示例:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      LEFT JOIN 
      --  为左表和右表设置不同的TTL时间
      /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
      user_score as d ON t.user_id = d.user_id;

通过表级JTL进行状态后端优化

本章节适用于MRS 3.3.0及以后版本。

在Flink双流inner Join场景下,若Join业务允许join一次就可以剔除后端中的数据时,可以使用该特性。

该特性只适用于流流inner join。

可通过使用Hint方式单独为左表和右表设置不同join次数:

  • Hint方式格式:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • 在SQL语句中配置示例:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      --  为左表和右表设置不同的JTL关联次数
      /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */
      user_score as d ON t.user_id = d.user_id;

TM的Slot数和TM的CPU数成倍数关系

在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。

合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。

【示例】建议配置TM Slot个数为CPU Core个数的2~4倍:

taskmanager.numberOfTaskSlots: 4
taskmanager.cpu.cores: 2

数据量大并发数高且有Shuffle时可调整网络内存

在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。

【示例】

# 网络占用内存占整个进程内存的比例
taskmanager.memory.network.fraction: 0.6
# 网络缓存内存的最小值
taskmanager.memory.network.min: 1g
# 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE)
taskmanager.memory.network.max: 20g

基于序列化性能尽量使用POJO和Avro等简单的数据类型

使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。

表1 序列化

序列化器

Opts/s

PojoSeriallizer

813

Kryo

294

Avro(Reflect API)

114

Avro(SpecificRecord API)

632

网络通信调优

Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。

【示例】

# netty的服务端线程数目(-1表示默认参数numOfSlot)
taskmanager.network.netty.server.numThreads -1(numOfSlot)
# netty的客户端线程数目(-1表示默认参数numofSlot)
taskmanager.network.netty.client.numThreads : -1
# netty的客户端连接超时时间
taskmanager.network.netty.client.connectTimeoutSec:120s
# netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB)
taskmanager.network.netty.sendReceiveBufferSize: 0
# netty的传输方式,默认方式会根据运行的平台选择合适的方式
taskmanager.network.netty.transport:auto

内存总体调优

Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。

图1 内存
表2 相关参数

参数

配置

注释

说明

Total Memory

taskmanager.memory.flink.size: none

总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。

整体内存。

taskmanager.memory.process.size: none

整个Flink进程使用的内存大小,容器模式时设置。

FrameWork

taskmanager.memory.framework.heap.size: 128mb

runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。

RUNTIME底层占用的内存,一般不用做较大改变。

taskmanager.memory.framework.off-heap.size: 128mb

runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。

Task

taskmanager.memory.task.heap.size:none

没有默认值,flink.size减去框架、托管、网络等得到。

算子逻辑,用户代码(如UDF)正常对象占用内存的地方。

taskmanager.memory.task.off-heap.size:0

默认值为0,task使用的off heap内存。

Managed Memory

taskmanager.memory.managed.fraction: 0.4

托管内存占taskmanager.memory.flink.size的比例,默认0.4。

managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。

taskmanager.memory.managed.size: 0

托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。

Network

taskmanager.memory.network.min:64mb

网络缓存的最小值。

用于taskmanager之间shuffle、广播以及与network buffer。

taskmanager.memory.network.max:1gb

网络缓存的最大值。(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE)

taskmanager.memory.network.fraction:0.1

network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。

用于taskmanager之间shuffle、广播以及与network buffer。

Others

taskmanager.memory.jvm-metaspace.size:256M

metaspace空间的最大值,默认值256MB。

用户自己管理的内存。

taskmanager.memory.jvm-overhead.min:192M

jvm额外开销的最小值,默认192MB。

taskmanager.memory.jvm-overhead.max:1G

jvm额外开销的最大值,默认1GB。

taskmanager.memory.jvm-overhead.fraction:0.1

jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。

说明:

3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值

如果不能使用broardcast join应该尽量减少shuffle数据

不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。

【示例】

# Runtime filter配置
table.exec.runtime-filter.enabled: true
# 下推
table.optimizer.source.predicate-pushdown-enabled: true

数据倾斜状态下可以使用localglobal优化策略

【示例】

#开启mini-batch优化
table.exec.mini-batch.enabled:true
#最长等待时间
table.exec.mini-batch.allow-latency: 20ms
#最大缓存记录数
table.exec.mini-batch.size:8000
#开启两阶段聚合
table.optimizer.agg-phase-strategy:TWO_PHASE

吞吐量大场景下使用MiniBatch聚合增加吞吐量

MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。

  • API方式:
    // instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
  • 资源文件方式(flink-conf.yaml):
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000

使用local-global两阶段聚合减少数据倾斜

Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。

数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。

  • API方式:
    // instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
  • 资源文件方式:
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000
    table.optimizer.agg-phase-strategy: TWO_PHASE

RocksDB作为状态后端时通过多块磁盘提升IO性能

RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。

【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml):

state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb

RocksDB作为状态后端时尽量使用MapState或ListState替换ValueState存储容器

RocksDB场景下,由于RocksDB是一个内嵌式的KV数据库,它的数据都是根据key和value进行存放的。对于map类数据,若使用ValueState,在RocksDB中作为一条记录存储,value是整个map,而使用MapState,在RocksDB中作为N条记录存储,这样做的好处是当进行查询或者修改可以只序列化一小部分数据,当将map作为整体存储时每次增删改都会产生很大的序列化开销。对于List数据,使用ListState可以无需序列化动态添加元素。

另外Flink中的State支持设置TTL,TTL实际上是将时间戳与userValue封装起来,ValueState的TTL基于整个Key,MapState<UK, UV>的TTL是基于UK,它的粒度更小,可支持更丰富的TTL语义。

Checkpoint配置压缩减少Checkpoint大小

在IO密集型应用中,可以通过开启Checkpoint压缩,牺牲极小部分CPU性能,提升IO性能。

【示例】配置Checkpoint时开启压缩(flink-conf.yaml):

execution.checkpointing.snapshot-compression: true

大状态Checkpoint优先从本地状态恢复

为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。

【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml):

state.backend.local-recovery: true

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容