计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

Flink任务开发建议

更新时间:2024-11-06 GMT+08:00

高可用性下考虑提高Checkpoint保存数

Checkpoint保存数默认是1,也就是只保存最新的Checkpoint的状态文件,当进行状态恢复时,如果最新的Checkpoint文件不可用(比如HDFS文件所有副本都损坏或者其他原因),那么状态恢复就会失败。如果设置Checkpoint保存数为2,即使最新的Checkpoint恢复失败,那么Flink会回滚到之前那一次Checkpoint的状态文件进行恢复。所以可以增加Checkpoint保存数。

【示例】配置Checkpoint文件保存数为2:

state.checkpoints.num-retained: 2

生产环境使用增量Rocksdb作为State Backend

Flink提供了三种状态后端:MemoryStateBackend,FsStateBackend,和RocksDBStateBackend。

  • MemoryStateBackend是将state存储在JobManager的Java堆上,每个状态的大小不能超过akka帧的大小,且总量不能超过JobManager的堆内存大小。所以只适合于本地开发调试,或状态大小有限的一些小状态的场景。
  • FsStateBackend是文件系统状态后端,正常情况下将state存储在TaskManager堆内存中,当Checkpoint时将state存储在文件系统上,而JobManager内存中存储极少的元数据(高可用场景下存储在ZooKeeper)。因为文件系统的存储空间足够,适合于大状态,长窗口,或大键值状态的有状态处理任务,也适合于高可用方案。
  • RocksDBStateBackend是内嵌数据库后端,正常情况下state存储在RocksDB数据库中,该数据库数据放在本地磁盘上,在Checkpoint时将state存储在配置的文件系统上而JobManager内存中存储极少的元数据(高可用场景下存储在ZooKeeper),同时是唯一一个可以增量Checkpoint的状态后端,除了适合于FsStateBackend的场景,还适用于超大状态的场景。
表1 Flink状态后端

类别

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

方式

Checkpoint数据直接返回给Master节点,不落盘

数据写入文件,将文件路径传给Master

数据写入文件,将文件路径传给Master

存储

堆内存

堆内存

Rocksdb(本地磁盘)

性能

相比最好(一般不用)

性能好

性能不好

缺点

数据量小、易丢失

容易OOM风险

需要读写、序列化、IO等耗时

是否支持增量

不支持

不支持

支持

【示例】配置RockDBStateBackend(flink-conf.yaml):

state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

使用EXACTLY ONCE流处理语义保证端到端的一致性

流处理语义有三种:EXACTLY ONCE、AT LEAST ONCE、AT MOST ONCE。

  • AT MOST ONCE:无法保证数据处理的完整性,但性能相比最好。
  • AT LEAST ONCE:可以保证数据处理的完整性,但无法保证数据处理的准确性,性能适中。
  • EXACTLY ONCE:可以保证数据处理的准确性,但性能最差。

首先需要确认能否保证EXACTLY_ONCE(严格一次),因为端到端EXACTLY ONCE语义需要输入数据源的可回放(例如Kafka可回放数据),输出数据源的事务性(例如MySQL可原子性写入数据)。在无法满足这些条件的情况下,可以视情况将其降级为AT LEAST ONCE或者AT MOST ONCE。

  • 在无法满足输入源的可回放时,只能保证AT MOST ONCE。
  • 在无法满足输出目的的原子性写入时,只能保证AT LEAST ONCE。

【示例】API方式设置Exactly once语义:

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

【示例】资源文件方式设置Exactly once语义:

# checkpoint的语义
execution.checkpointing.mode: EXACTLY_ONCE

通过查看监控信息定位Back Pressure点

Flink提供了很多的监控指标,根据这些指标可以分析任务过程中的性能状况及瓶颈。

【示例】配置采样的样本数和时间间隔:

# 有效的反压结果被废弃并重新进行采样的时间,单位ms
web.backpressure.refresh-interval: 60000
# 用于确定反压采样的样本数
web.backpressure.num-samples: 100
# 用于确定反压采样的间隔时间,单位ms
web.backpressure.delay-between-samples: 50

可以在Job的Overview选项卡后面查看BackPressure,如下图表示采样进行中,默认情况下,大约需要5秒完成采样。

图1 采样进行中

如下图显示“OK”表示没有反压,“HIGH”表示对应SubTask被反压。

图2 无反压状态
图3 反压状态

使用Hive SQL时如果Flink语法不兼容则可切换Hive方言

当前Flink支持的SQL语法解析引擎有default和Hive两种,第一种为Flink原生SQL语言,第二种是Hive SQL语言。因为部分Hive语法的DDL和DML无法用Flink SQL运行,所以遇到这种SQL可直接切换成Hive的dialect。使用Hive dialect需要注意:

  • Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言应与HiveCatalog一起使用。
  • 虽然所有Hive版本都支持相同的语法,但是是否有特定功能仍然取决于使用的Hive版本。例如仅在Hive-2.7.0或更高版本中支持更新数据库位置。
  • Hive和Calcite具有不同的保留关键字。例如default在Calcite中是保留关键字,在Hive中是非保留关键字。所以在使用Hive dialect时,必须使用反引号(`)引用此类关键字,才能将其用作标识符。
  • 在Hive中不能查询在Flink中创建的视图。

【示例】修改SQL解析为Hive语法(sql-submit-defaults.yaml):

configuration:  table.sql-dialect: hive

中小规模数据量维度表可以采用内存维度表(如Hudi)

  • 内存维度表:将维度数据加载到内存当中,每个TM都会加载全量的数据,在内存内实现数据点查关联。若数据量过大,需要给TM分配大的内存空间,否则容易导致作业异常。
  • 外置维度表:将维度数据存在高速的K-V数据库中,通过远程的K-V查询实现点查关联,常用的开源K-V库有HBase。
  • 状态维度表:将维度表数据当做流表,实时读入到流式作业当中,通过数据的回撤流能力实现维度更新和数据不对齐场景下的数据一致性保证。维度表保存时间比较长,当前Flink on Hudi能力可以针对Hudi作为维度表单独设置TTL时长。
表2 维度表实现方式对比

维度

内存维度表(hive/hudi表)

外置维度表(HBase)

状态维度表

性能

非常高(毫秒内)

中(毫秒级)

高(毫秒内~毫秒级)

数据量

小,建议单个TM保持1GB以内

大,TB级

中,GB级

存储资源

内存消耗大,单个TM全量存储

外置存储,无存储资源消耗

各TM分散存储,内存+磁盘存储

时效性

周期性数据加载,时效低

相对高

关联数据结果

-

大数据量的维度表建议采用HBase

数据量比较大,而且不要数据高一致的场景,可以采用HBase类的KV库提供维度表点查关联能力。

由于K-V库的数据需由另外的作业写入,与当前的Flink作业会存在一定的时差,容易导致当前Flink作业查询K-V库时不是最新的数据,且由于lookup查询不支持回撤,关联的结果存在一致性问题。

维度表要求高数据一致性采用流表作为维度表

基于Hudi作为维度source表,可以实现维度表单独设置TTL时长,不跟随作业的整体TTL时间进行数据老化,从而保证维度数据可以长期保存在状态后端中。而且基于流表作为维度表可以基于Flink回撤机制实现数据的一致性。

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容