Hudi作为目的端时,checkpoint超时导致作业重试,多次重试后异常且报错信息包含“Checkpoint expired before completing”、“Checkpoint Coordinator is suspending”怎么办?
问题描述
运行中的实时迁移作业出现重试告警,作业同步速率低,且伴随有持续反压,最终作业运行失败。关键字“Checkpoint expired before completing”、“Checkpoint Coordinator is suspending”。
报错信息详情:
org.apache.flink.runtime.checkpoint. CheckpointException: Checkpoint expired before completing. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2159) [flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_422] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_422] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_422] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_422] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_422] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_422]
或
org.apache.flink.runtime.checkpoint. CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1920) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.scheduler.SchedulerBase.stopCheckpointScheduler(SchedulerBase.java:962) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.scheduler.SchedulerBase.triggerSavepoint(SchedulerBase.java:905) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:835) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
原因分析
checkpoint默认超时时间为10min,作业checkpoint超时可能由以下原因导致:
- 单次写出的文件数量过多,因为IO限制导致flush数据超时,最终checkpoint超时。
    
taskmanager日志搜索关键字“bucket number:”,同步查看“cost:”耗时信息。如果单次flush bucket个数超过100或cost耗时接近10min,可以判定为flush数据文件过多导致的checkpoint超时。
图1 taskmanager日志
     - Hudi表分区过多(多见于小时级以下分区),导致在commit阶段schedule compaction时需要扫描大量分区目录,非常耗时,checkpoint迟迟无法完成。
    
jobmanager搜索关键字“[commit] schedule compaction for table {}, cost: {}ms”查看计划compaction的耗时,如果耗时接近10min,可以判定为计划compaction时涉及的分区过多导致checkpoint超时。
图2 jobmanager日志
     - 内存不足导致数据写出极慢,最终checkpoint超时。
    
数据开发界面前往“作业监控 -> 实时集成作业监控 -> 找到对应作业 -> 进入监控信息 -> 查看监控指标”,可以在监控界面看到实时的内存消耗,如果“作业内存使用率”指标反复达到100%可以判定为资源不足导致的checkpoint超时/作业异常。
 
解决方案
- 加大作业线程内flush数据文件并发数:hoodie.sink.flush.tasks = 20(默认2)。
 - 加大作业commit阶段并发扫描分区的并发数:hoodie.context.flatmap.parallelism = 50(默认2)。
 - 同步调大作业资源,增加作业并发数。