Flink作业如何进行性能调优
概念说明及监控查看
- 消费组积压
消费组积压可通过topic最新数据offset减去该消费组已提交最大offset计算得出,说明的是该消费组当前待消费的数据总量。
如果Flink作业对接的是kafka专享版,则可通过云监控服务(CES)进行查看。具体可选择“云服务监控 > 分布式消息服务 > kafka专享版” ,单击“kafka实例名称 > 消费组” ,选择具体的消费组名称,查看消费组的指标信息。
图1 消费组
- 反压状态
反压状态是通过周期性对taskManager线程的栈信息采样,计算被阻塞在请求输出Buffer的线程比率来确定,默认情况下,比率在0.1以下为OK,0.1到0.5为LOW,超过0.5则为HIGH。
- 时延
Source端会周期性地发送带当前时间戳的LatencyMarker,下游算子接收到该标记后,通过当前时间减去标记中带的时间戳的方式,计算时延指标。算子的反压状态和时延可以通过Flink UI或者作业任务列表查看,一般情况下反压和高时延成对出现:
图2 反压状态和时延
性能分析
由于Flink的反压机制,流作业在存在性能问题的情况下,会导致数据源消费速率跟不上生产速率,从而引起Kafka消费组的积压。在这种情况下,可以通过算子的反压和时延,确定算子的性能瓶颈点。
- 作业最后一个算子(Sink)反压正常(绿色),前面算子反压高(红色)
该场景说明性能瓶颈点在sink,此时需要根据具体数据源具体优化,比如对于JDBC数据源,可以通过调整写出批次(connector.write.flush.max-rows)、JDBC参数重写(rewriteBatchedStatements=true)等进行优化。
- 作业非倒数第二个算子反压高(红色)
该场景说明性能瓶颈点在Vertex2算子,可以通过查看该算子描述,确认该算子具体功能,以进行下一步优化。
- 所有算子反压都正常(绿色),但存在数据堆积
该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。
- 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色)
该场景说明性能瓶颈在Vertex2或者Vertex3,为了进一步确定具体瓶颈点算子,可以在FlinkUI页面开启inPoolUsage监控。如果某个算子并发对应的inPoolUsage长时间为100%,则该算子大概率为性能瓶颈点,需分析该算子以进行下一步优化。
图3 inPoolUsage监控
性能调优
- rocksdb状态调优
topN排序、窗口聚合计算以及流流join等都涉及大量的状态操作,因而如果发现这类算子存在性能瓶颈,可以尝试优化状态操作的性能。主要可以尝试通过如下方式优化:
- 增加状态操作内存,降低磁盘IO
- 增加单slot cu资源数
- 配置优化参数:
- taskmanager.memory.managed.fraction=xx
- state.backend.rocksdb.block.cache-size=xx
- state.backend.rocksdb.writebuffer.size=xx
- 开启微批模式,避免状态频繁操作
- table.exec.mini-batch.enabled=true
- table.exec.mini-batch.allow-latency=xx
- table.exec.mini-batch.size=xx
- 使用超高IO本地盘规格机型,加速磁盘操作
- 增加状态操作内存,降低磁盘IO
- group agg单点及数据倾斜调优
按天聚合计算或者group by key不均衡场景下,group聚合计算存在单点或者数据倾斜问题,此时,可以通过将聚合计算拆分成Local-Global进行优化。配置方式为设置调优参数: table.optimizer.aggphase-strategy=TWO_PHASE
- count distinct优化
- 在count distinct关联key比较稀疏场景下,即使使用Local-Global,单点问题依然非常严重,此时可以通过配置以下调优参数进行分桶拆分优化:
- table.optimizer.distinct-agg.split.enabled=true
- table.optimizer.distinct-agg.split.bucket-num=xx
- 使用filter替换case when:
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone')THEN user_id ELSE NULL END) AS app_uv
可调整为
COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('android', 'iphone')) AS app_uv
- 在count distinct关联key比较稀疏场景下,即使使用Local-Global,单点问题依然非常严重,此时可以通过配置以下调优参数进行分桶拆分优化:
- 维表join优化
维表join根据左表进入的每条记录join关联键,先在缓存中匹配,如果匹配不到,则从远程拉取。因而,可以通过如下方式优化:
- 增加JVM内存并增加缓存记录条数
- 维表设置索引,加快查询速度