更新时间:2024-12-25 GMT+08:00
分享

Hudi数据表Compaction规范

mor表更新数据以行存log的形式写入,log读取时需要按主键合并,并且是行存的,导致log读取效率比parquet低很多。为了解决log读取的性能问题,Hudi通过compaction将log压缩成parquet文件,大幅提升读取性能。

规则

  • 有数据持续写入的表,24小时内至少执行一次compaction。

    对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题:

    • Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。
    • 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。
    • 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。
  • 提交Spark jar作业时,CPU与内存比例为1:4~1:8。

    Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。

建议

  • 通过增加并发数提升Compaction性能。

    CPU和内存比例配置合理会保证Compaction作业是稳定的,实现单个Compaction task的稳定运行。但是Compaction整体的运行时长取决于本次Compaction处理文件数以及分配的cpu核数(并发能力),因此可以通过增加Compaciton作业的CPU核的个数来提升Compaction性能(注意增加cpu也要保证CPU与内存的比例)。

  • Hudi表采用异步Compaction。

    为了保证流式入库作业的稳定运行,就需要保证流式作业不在实时入库的过程中做其它任务,比如Flink写Hudi的同时会做Compaction。这看似是一个不错的方案,即完成了入库又完成Compaction。但是Compaction操作是非常消耗内存和IO的,它会给流式入库作业带来以下影响:

    • 增加端到端时延:Compaction会放大写入时延,因为Compaction比入库更耗时。
    • 作业不稳定:Compaction会给入库作业带来更多的不稳定性,Compaction OOM将会导致整个作业直接失败。
  • 建议2~4小时进行一次compaction。

    Compaction是MOR表非常重要且必须执行的维护手段,对于实时任务来说,要求Compaction执行合并的过程必须和实时任务解耦,通过周期调度Spark任务来完成异步Compaction,这个方案的关键之处在于如何合理的设置这个周期,周期如果太短意味着Spark任务可能会空跑,周期如果太长可能会积压太多的Compaction Plan没有去执行而导致Spark任务耗时长并且也会导致下游的读作业时延高。对此场景,在这里给出以下建议:按照集群资源使用情况,可以每2小时或每4个小时去调度执行一次异步Compaction作业,这是一个基本的维护MOR表的方案。

  • 采用Spark异步执行Compaction,不采用Flink进行Compaction。

    Flink写hudi建议的方案是Flink只负责写数据和生成Compaction计划。由单独的队列提交Spark SQL或Spark jar作业异步执行compaction、clean和archive。Compaction计划的生成是轻量级的对Flink写入作业影响可以忽略。

    上述方案落地的具体步骤参考如下:

    • Flink只负责写数据和生成Compaction计划

      Flink流任务建表语句/SQL hints中添加如下参数,控制Flink任务写Hudi时只会生成Compaction plan。

      'compaction.async.enabled' = 'false'      // 关闭Flink 执行Compaction任务
      'compaction.schedule.enabled' = 'true'    // 开启Compaction计划生成
      'compaction.delta_commits' = '5'          // MOR表默认5次checkpoint尝试生成compaction plan,该参数需要根据具体业务调整
      'clean.async.enabled' = 'false'           // 关闭Clean操作
      'hoodie.archive.automatic' = 'false'      // 关闭Archive操作
    • Spark离线完成Compaction计划的执行,以及Clean和Archive操作

      在调度平台(可以使用华为的DataArts)运行一个定时调度的离线任务来让Spark完成Hudi表的Compaction计划执行以及Clean和Archive操作。

      以SQL作业为例,在配置中添加:

      hoodie.archive.automatic = false;
      hoodie.clean.automatic = false;
      hoodie.compact.inline = true;
      hoodie.run.compact.only.inline=true;
      hoodie.cleaner.commits.retained = 500;  // clean保留timeline上最新的500个deltacommit对应的数据文件,之前的deltacommit所对应的旧版本文件会被清理。该值需要大于compaction.delta_commits设置的值,需要根据具体业务调整。
      hoodie.keep.max.commits = 700;  // timeline最多保留700个deltacommit
      hoodie.keep.min.commits = 501;  // timeline最少保留500个deltacommit。该值需要大于hoodie.cleaner.commits.retained设置的值,需要根据具体业务调整。

      随后保持上述配置按顺序调度执行以下SQL:

      run compaction on <database name>. <table name>;   // 执行Compaction计划
      run clean on <database name>. <table name>;        // 执行Clean操作
      run archivelog on <database name>.<table name>;    // 执行Archive操作
  • 异步Compaction可以将多个表串行到一个作业,资源配置相近的表放到一组,该组作业的资源需求为最大消耗资源的表所需的资源

    对于在•Hudi表采用异步Compaction•采用Spark异步执行Compaction,不...中提到的异步Compaction任务,这里给出以下开发建议:

    • 不需要对每张Hudi表都开发异步Compaction任务,这样会导致作业开发成本上升。
    • 异步Compaction任务可以通过提交Spark SQL作业来完成,也可以在Spark jar任务中处理多张表的compaction,clean,archive:
      hoodie.clean.async = true;
      hoodie.clean.automatic = false;
      hoodie.compact.inline = true;
      hoodie.run.compact.only.inline=true;
      hoodie.cleaner.commits.retained = 500;
      hoodie.keep.min.commits = 501;
      hoodie.keep.max.commits = 700;
      需要按顺序调度执行以下SQL:
      run compaction on <database name>. <table1>;
      run clean on <database name>. <table1>;
      run archivelog on <database name>.<table1>;
      run compaction on <database name>.<table2>;
      run clean on <database name>.<table2>;
      run archivelog on <database name>.<table2>;

相关文档