更新时间:2022-11-09 GMT+08:00

Flink作业如何进行性能调优

概念说明及监控查看

  • 消费组积压

    消费组积压可通过topic最新数据offset减去该消费组已提交最大offset计算得出,说明的是该消费组当前待消费的数据总量。

    如果Flink作业对接的是kafka专享版,则可通过云监控服务(CES)进行查看。具体可选择“云服务监控 > 分布式消息服务 > kafka专享版” ,单击“kafka实例名称 > 消费组” ,选择具体的消费组名称,查看消费组的指标信息。

    图1 消费组
  • 反压状态

    反压状态是通过周期性对taskManager线程的栈信息采样,计算被阻塞在请求输出Buffer的线程比率来确定,默认情况下,比率在0.1以下为OK,0.1到0.5为LOW,超过0.5则为HIGH。

  • 时延

    Source端会周期性地发送带当前时间戳的LatencyMarker,下游算子接收到该标记后,通过当前时间减去标记中带的时间戳的方式,计算时延指标。算子的反压状态和时延可以通过Flink UI或者作业任务列表查看,一般情况下反压和高时延成对出现:

    图2 反压状态和时延

性能分析

由于Flink的反压机制,流作业在存在性能问题的情况下,会导致数据源消费速率跟不上生产速率,从而引起Kafka消费组的积压。在这种情况下,可以通过算子的反压和时延,确定算子的性能瓶颈点。

  • 作业最后一个算子(Sink)反压正常(绿色),前面算子反压高(红色)

    该场景说明性能瓶颈点在sink,此时需要根据具体数据源具体优化,比如对于JDBC数据源,可以通过调整写出批次(connector.write.flush.max-rows)、JDBC参数重写(rewriteBatchedStatements=true)等进行优化。

  • 作业非倒数第二个算子反压高(红色)

    该场景说明性能瓶颈点在Vertex2算子,可以通过查看该算子描述,确认该算子具体功能,以进行下一步优化。

  • 所有算子反压都正常(绿色),但存在数据堆积

    该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。

  • 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色)

    该场景说明性能瓶颈在Vertex2或者Vertex3,为了进一步确定具体瓶颈点算子,可以在FlinkUI页面开启inPoolUsage监控。如果某个算子并发对应的inPoolUsage长时间为100%,则该算子大概率为性能瓶颈点,需分析该算子以进行下一步优化。

    图3 inPoolUsage监控

性能调优

  • rocksdb状态调优

    topN排序、窗口聚合计算以及流流join等都涉及大量的状态操作,因而如果发现这类算子存在性能瓶颈,可以尝试优化状态操作的性能。主要可以尝试通过如下方式优化:

    • 增加状态操作内存,降低磁盘IO
      • 增加单slot cu资源数
      • 配置优化参数:
        • taskmanager.memory.managed.fraction=xx
        • state.backend.rocksdb.block.cache-size=xx
        • state.backend.rocksdb.writebuffer.size=xx
    • 开启微批模式,避免状态频繁操作

      配置参数:

      • table.exec.mini-batch.enabled=true
      • table.exec.mini-batch.allow-latency=xx
      • table.exec.mini-batch.size=xx
    • 使用超高IO本地盘规格机型,加速磁盘操作
  • group agg单点及数据倾斜调优

    按天聚合计算或者group by key不均衡场景下,group聚合计算存在单点或者数据倾斜问题,此时,可以通过将聚合计算拆分成Local-Global进行优化。配置方式为设置调优参数: table.optimizer.aggphase-strategy=TWO_PHASE

  • count distinct优化
    • 在count distinct关联key比较稀疏场景下,即使使用Local-Global,单点问题依然非常严重,此时可以通过配置以下调优参数进行分桶拆分优化:
      • table.optimizer.distinct-agg.split.enabled=true
      • table.optimizer.distinct-agg.split.bucket-num=xx
    • 使用filter替换case when:

      例如:

      COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone')THEN user_id ELSE NULL END) AS app_uv

      可调整为

      COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('android', 'iphone')) AS app_uv
  • 维表join优化

    维表join根据左表进入的每条记录join关联键,先在缓存中匹配,如果匹配不到,则从远程拉取。因而,可以通过如下方式优化:

    • 增加JVM内存并增加缓存记录条数
    • 维表设置索引,加快查询速度

性能调优 所有常见问题

more