Flink性能调优建议
Hudi MOR流表开启log Index特性提升Flink流读Mor表性能
Hudi的Mor表可以通过log index提升读写性能,在Sink和Source表添加属性 'hoodie.log.index.enabled'='true'。
通过调整对应算子并行度提升性能
非状态计算提升性能的资源优化
Flink计算操作分为如下两类:
- 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。
- 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。
对于非状态计算主要调优为TaskManager的Heap Size与NetWork。
例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。
状态计算提升性能的资源优化
SQL逻辑包含较多join、卷积计算等操作。主要调优状态后端性能、vCore、Manage Memory。
例如作业做三表多表关联,性能要求高,单个TaskManage增加额外的6个vCore,off-Heap和Overhead提高到5GB,用于Flink状态管理的Manage Memory为9.6GB。
通过表级TTL进行状态后端优化
本章节适用于MRS 3.3.0及以后版本。
在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。
可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒:
- Hint方式格式:
table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral
- 在SQL语句中配置示例:
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t LEFT JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id;
通过表级JTL进行状态后端优化
本章节适用于MRS 3.3.0及以后版本。
在Flink双流inner Join场景下,若Join业务允许join一次就可以剔除后端中的数据时,可以使用该特性。
该特性只适用于流流inner join。
可通过使用Hint方式单独为左表和右表设置不同join次数:
- Hint方式格式:
table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral
- 在SQL语句中配置示例:
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置不同的JTL关联次数 /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */ user_score as d ON t.user_id = d.user_id;
TM的Slot数和TM的CPU数成倍数关系
在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。
合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。
【示例】建议配置TM Slot个数为CPU Core个数的2~4倍:
taskmanager.numberOfTaskSlots: 4 taskmanager.cpu.cores: 2
数据量大并发数高且有Shuffle时可调整网络内存
在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。
【示例】
# 网络占用内存占整个进程内存的比例 taskmanager.memory.network.fraction: 0.6 # 网络缓存内存的最小值 taskmanager.memory.network.min: 1g # 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.max: 20g
基于序列化性能尽量使用POJO和Avro等简单的数据类型
使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。
序列化器 |
Opts/s |
---|---|
PojoSeriallizer |
813 |
Kryo |
294 |
Avro(Reflect API) |
114 |
Avro(SpecificRecord API) |
632 |
网络通信调优
Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。
【示例】
# netty的服务端线程数目(-1表示默认参数numOfSlot) taskmanager.network.netty.server.numThreads -1(numOfSlot) # netty的客户端线程数目(-1表示默认参数numofSlot) taskmanager.network.netty.client.numThreads : -1 # netty的客户端连接超时时间 taskmanager.network.netty.client.connectTimeoutSec:120s # netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB) taskmanager.network.netty.sendReceiveBufferSize: 0 # netty的传输方式,默认方式会根据运行的平台选择合适的方式 taskmanager.network.netty.transport:auto
内存总体调优
Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。
参数 |
配置 |
注释 |
说明 |
---|---|---|---|
Total Memory |
taskmanager.memory.flink.size: none |
总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。 |
整体内存。 |
taskmanager.memory.process.size: none |
整个Flink进程使用的内存大小,容器模式时设置。 |
||
FrameWork |
taskmanager.memory.framework.heap.size: 128mb |
runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。 |
RUNTIME底层占用的内存,一般不用做较大改变。 |
taskmanager.memory.framework.off-heap.size: 128mb |
runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。 |
||
Task |
taskmanager.memory.task.heap.size:none |
没有默认值,flink.size减去框架、托管、网络等得到。 |
算子逻辑,用户代码(如UDF)正常对象占用内存的地方。 |
taskmanager.memory.task.off-heap.size:0 |
默认值为0,task使用的off heap内存。 |
||
Managed Memory |
taskmanager.memory.managed.fraction: 0.4 |
托管内存占taskmanager.memory.flink.size的比例,默认0.4。 |
managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。 |
taskmanager.memory.managed.size: 0 |
托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。 |
||
Network |
taskmanager.memory.network.min:64mb |
网络缓存的最小值。 |
用于taskmanager之间shuffle、广播以及与network buffer。 |
taskmanager.memory.network.max:1gb |
网络缓存的最大值。(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) |
||
taskmanager.memory.network.fraction:0.1 |
network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。 |
用于taskmanager之间shuffle、广播以及与network buffer。 |
|
Others |
taskmanager.memory.jvm-metaspace.size:256M |
metaspace空间的最大值,默认值256MB。 |
用户自己管理的内存。 |
taskmanager.memory.jvm-overhead.min:192M |
jvm额外开销的最小值,默认192MB。 |
||
taskmanager.memory.jvm-overhead.max:1G |
jvm额外开销的最大值,默认1GB。 |
||
taskmanager.memory.jvm-overhead.fraction:0.1 |
jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。 |
3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值
如果不能使用broardcast join应该尽量减少shuffle数据
不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。
【示例】
# Runtime filter配置 table.exec.runtime-filter.enabled: true # 下推 table.optimizer.source.predicate-pushdown-enabled: true
数据倾斜状态下可以使用localglobal优化策略
【示例】
#开启mini-batch优化 table.exec.mini-batch.enabled:true #最长等待时间 table.exec.mini-batch.allow-latency: 20ms #最大缓存记录数 table.exec.mini-batch.size:8000 #开启两阶段聚合 table.optimizer.agg-phase-strategy:TWO_PHASE
吞吐量大场景下使用MiniBatch聚合增加吞吐量
MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。
- API方式:
// instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
- 资源文件方式(flink-conf.yaml):
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000
使用local-global两阶段聚合减少数据倾斜
Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。
数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。
- API方式:
// instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
- 资源文件方式:
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000 table.optimizer.agg-phase-strategy: TWO_PHASE
RocksDB作为状态后端时通过多块磁盘提升IO性能
RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。
【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml):
state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb
RocksDB作为状态后端时尽量使用MapState或ListState替换ValueState存储容器
RocksDB场景下,由于RocksDB是一个内嵌式的KV数据库,它的数据都是根据key和value进行存放的。对于map类数据,若使用ValueState,在RocksDB中作为一条记录存储,value是整个map,而使用MapState,在RocksDB中作为N条记录存储,这样做的好处是当进行查询或者修改可以只序列化一小部分数据,当将map作为整体存储时每次增删改都会产生很大的序列化开销。对于List数据,使用ListState可以无需序列化动态添加元素。
另外Flink中的State支持设置TTL,TTL实际上是将时间戳与userValue封装起来,ValueState的TTL基于整个Key,MapState<UK, UV>的TTL是基于UK,它的粒度更小,可支持更丰富的TTL语义。
Checkpoint配置压缩减少Checkpoint大小
在IO密集型应用中,可以通过开启Checkpoint压缩,牺牲极小部分CPU性能,提升IO性能。
【示例】配置Checkpoint时开启压缩(flink-conf.yaml):
execution.checkpointing.snapshot-compression: true
大状态Checkpoint优先从本地状态恢复
为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。
【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml):
state.backend.local-recovery: true