网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
云化数据中心 CloudDC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务

Flink性能调优建议

更新时间:2024-06-15 GMT+08:00
分享

Hudi MOR流表开启log Index特性提升Flink流读Mor表性能

Hudi的Mor表可以通过log index提升读写性能,在Sink和Source表添加属性 'hoodie.log.index.enabled'='true'。

通过调整对应算子并行度提升性能

  • 读写Hudi可以通过配置读写并发提升读写性能。

    读算子的并行度调整参数:read.tasks

    写算子的并行度调整参数:write.tasks

  • 采用状态索引在作业重启的时候(非Checkpoint重启),需要读目标表重建索引,可以增大该算子并行度提升性能。

    加载索引的并行度调整参数:write.index_bootstrap.tasks

  • 采用状态索引写数据需要进行主键唯一性检查,分配具体写入文件,提升该算子并行度提升性能。

    写算子索引检测算子调整参数:write.bucket_assign.tasks

非状态计算提升性能的资源优化

Flink计算操作分为如下两类:

  • 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。
  • 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。

对于非状态计算主要调优为TaskManager的Heap Size与NetWork。

例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。

状态计算提升性能的资源优化

SQL逻辑包含较多join、卷积计算等操作。主要调优状态后端性能、vCore、Manage Memory。

例如作业做三表多表关联,性能要求高,单个TaskManage增加额外的6个vCore,off-Heap和Overhead提高到5GB,用于Flink状态管理的Manage Memory为9.6GB。

通过表级TTL进行状态后端优化

本章节适用于MRS 3.3.0及以后版本。

在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。

可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒:

  • Hint方式格式:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • 在SQL语句中配置示例:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      LEFT JOIN 
      --  为左表和右表设置不同的TTL时间
      /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
      user_score as d ON t.user_id = d.user_id;

通过表级JTL进行状态后端优化

本章节适用于MRS 3.3.0及以后版本。

在Flink双流inner Join场景下,若Join业务允许join一次就可以剔除后端中的数据时,可以使用该特性。

该特性只适用于流流inner join。

可通过使用Hint方式单独为左表和右表设置不同join次数:

  • Hint方式格式:
    table_path /*+ OPTIONS(key=val [, key=val]*) */  
    
    key:
         stringLiteral 
    val:
         stringLiteral
  • 在SQL语句中配置示例:
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      --  为左表和右表设置不同的JTL关联次数
      /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */
      user_score as d ON t.user_id = d.user_id;

TM的Slot数和TM的CPU数成倍数关系

在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。

合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。

【示例】建议配置TM Slot个数为CPU Core个数的2~4倍:

taskmanager.numberOfTaskSlots: 4
taskmanager.cpu.cores: 2

数据量大并发数高且有Shuffle时可调整网络内存

在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。

【示例】

# 网络占用内存占整个进程内存的比例
taskmanager.memory.network.fraction: 0.6
# 网络缓存内存的最小值
taskmanager.memory.network.min: 1g
# 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE)
taskmanager.memory.network.max: 20g

基于序列化性能尽量使用POJO和Avro等简单的数据类型

使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。

表1 序列化

序列化器

Opts/s

PojoSeriallizer

813

Kryo

294

Avro(Reflect API)

114

Avro(SpecificRecord API)

632

网络通信调优

Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。

【示例】

# netty的服务端线程数目(-1表示默认参数numOfSlot)
taskmanager.network.netty.server.numThreads -1(numOfSlot)
# netty的客户端线程数目(-1表示默认参数numofSlot)
taskmanager.network.netty.client.numThreads : -1
# netty的客户端连接超时时间
taskmanager.network.netty.client.connectTimeoutSec:120s
# netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB)
taskmanager.network.netty.sendReceiveBufferSize: 0
# netty的传输方式,默认方式会根据运行的平台选择合适的方式
taskmanager.network.netty.transport:auto

内存总体调优

Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。

图1 内存
表2 相关参数

参数

配置

注释

说明

Total Memory

taskmanager.memory.flink.size: none

总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。

整体内存。

taskmanager.memory.process.size: none

整个Flink进程使用的内存大小,容器模式时设置。

FrameWork

taskmanager.memory.framework.heap.size: 128mb

runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。

RUNTIME底层占用的内存,一般不用做较大改变。

taskmanager.memory.framework.off-heap.size: 128mb

runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。

Task

taskmanager.memory.task.heap.size:none

没有默认值,flink.size减去框架、托管、网络等得到。

算子逻辑,用户代码(如UDF)正常对象占用内存的地方。

taskmanager.memory.task.off-heap.size:0

默认值为0,task使用的off heap内存。

Managed Memory

taskmanager.memory.managed.fraction: 0.4

托管内存占taskmanager.memory.flink.size的比例,默认0.4。

managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。

taskmanager.memory.managed.size: 0

托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。

Network

taskmanager.memory.network.min:64mb

网络缓存的最小值。

用于taskmanager之间shuffle、广播以及与network buffer。

taskmanager.memory.network.max:1gb

网络缓存的最大值。(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE)

taskmanager.memory.network.fraction:0.1

network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。

用于taskmanager之间shuffle、广播以及与network buffer。

Others

taskmanager.memory.jvm-metaspace.size:256M

metaspace空间的最大值,默认值256MB。

用户自己管理的内存。

taskmanager.memory.jvm-overhead.min:192M

jvm额外开销的最小值,默认192MB。

taskmanager.memory.jvm-overhead.max:1G

jvm额外开销的最大值,默认1GB。

taskmanager.memory.jvm-overhead.fraction:0.1

jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。

说明:

3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值

如果不能使用broardcast join应该尽量减少shuffle数据

不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。

【示例】

# Runtime filter配置
table.exec.runtime-filter.enabled: true
# 下推
table.optimizer.source.predicate-pushdown-enabled: true

数据倾斜状态下可以使用localglobal优化策略

【示例】

#开启mini-batch优化
table.exec.mini-batch.enabled:true
#最长等待时间
table.exec.mini-batch.allow-latency: 20ms
#最大缓存记录数
table.exec.mini-batch.size:8000
#开启两阶段聚合
table.optimizer.agg-phase-strategy:TWO_PHASE

吞吐量大场景下使用MiniBatch聚合增加吞吐量

MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。

  • API方式:
    // instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
  • 资源文件方式(flink-conf.yaml):
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000

使用local-global两阶段聚合减少数据倾斜

Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。

数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。

  • API方式:
    // instantiate table environmentTableEnvironment tEnv = ...
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
  • 资源文件方式:
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency : 5 s
    table.exec.mini-batch.size: 5000
    table.optimizer.agg-phase-strategy: TWO_PHASE

RocksDB作为状态后端时通过多块磁盘提升IO性能

RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。

【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml):

state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb

RocksDB作为状态后端时尽量使用MapState或ListState替换ValueState存储容器

RocksDB场景下,由于RocksDB是一个内嵌式的KV数据库,它的数据都是根据key和value进行存放的。对于map类数据,若使用ValueState,在RocksDB中作为一条记录存储,value是整个map,而使用MapState,在RocksDB中作为N条记录存储,这样做的好处是当进行查询或者修改可以只序列化一小部分数据,当将map作为整体存储时每次增删改都会产生很大的序列化开销。对于List数据,使用ListState可以无需序列化动态添加元素。

另外Flink中的State支持设置TTL,TTL实际上是将时间戳与userValue封装起来,ValueState的TTL基于整个Key,MapState<UK, UV>的TTL是基于UK,它的粒度更小,可支持更丰富的TTL语义。

Checkpoint配置压缩减少Checkpoint大小

在IO密集型应用中,可以通过开启Checkpoint压缩,牺牲极小部分CPU性能,提升IO性能。

【示例】配置Checkpoint时开启压缩(flink-conf.yaml):

execution.checkpointing.snapshot-compression: true

大状态Checkpoint优先从本地状态恢复

为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。

【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml):

state.backend.local-recovery: true
提示

您即将访问非华为云网站,请注意账号财产安全

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容