文档首页/ 数据治理中心 DataArts Studio/ 常见问题/ 数据集成(实时作业)/ Hudi作为目的端时,checkpoint超时导致作业重试,多次重试后异常且报错信息包含“Checkpoint expired before completing”、“Checkpoint Coordinator is suspending”怎么办?
更新时间:2025-11-03 GMT+08:00
分享

Hudi作为目的端时,checkpoint超时导致作业重试,多次重试后异常且报错信息包含“Checkpoint expired before completing”、“Checkpoint Coordinator is suspending”怎么办?

问题描述

运行中的实时迁移作业出现重试告警,作业同步速率低,且伴随有持续反压,最终作业运行失败。关键字“Checkpoint expired before completing”、“Checkpoint Coordinator is suspending”。

报错信息详情:

org.apache.flink.runtime.checkpoint.
CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2159) [flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_422]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_422]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_422]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_422]

org.apache.flink.runtime.checkpoint.
CheckpointException: Checkpoint Coordinator is suspending.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1920) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.scheduler.SchedulerBase.stopCheckpointScheduler(SchedulerBase.java:962) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.scheduler.SchedulerBase.triggerSavepoint(SchedulerBase.java:905) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:835) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_ae4cd5c5-3624-42d3-8c01-cbc8c5e08edf.jar:1.15.0-h0.cbu.dli.330.20241021.r34]

原因分析

checkpoint默认超时时间为10min,作业checkpoint超时可能由以下原因导致:

  • 单次写出的文件数量过多,因为IO限制导致flush数据超时,最终checkpoint超时。

    taskmanager日志搜索关键字“bucket number:”,同步查看“cost:”耗时信息。如果单次flush bucket个数超过100或cost耗时接近10min,可以判定为flush数据文件过多导致的checkpoint超时。

    图1 taskmanager日志
  • Hudi表分区过多(多见于小时级以下分区),导致在commit阶段schedule compaction时需要扫描大量分区目录,非常耗时,checkpoint迟迟无法完成。

    jobmanager搜索关键字“[commit] schedule compaction for table {}, cost: {}ms”查看计划compaction的耗时,如果耗时接近10min,可以判定为计划compaction时涉及的分区过多导致checkpoint超时。

    图2 jobmanager日志
  • 内存不足导致数据写出极慢,最终checkpoint超时。

    数据开发界面前往“作业监控 -> 实时集成作业监控 -> 找到对应作业 -> 进入监控信息 -> 查看监控指标”,可以在监控界面看到实时的内存消耗,如果“作业内存使用率”指标反复达到100%可以判定为资源不足导致的checkpoint超时/作业异常。

解决方案

  • 加大作业线程内flush数据文件并发数:hoodie.sink.flush.tasks = 20(默认2)。
  • 加大作业commit阶段并发扫描分区的并发数:hoodie.context.flatmap.parallelism = 50(默认2)。
  • 同步调大作业资源,增加作业并发数。

相关文档