更新时间:2024-12-25 GMT+08:00
分享

Hudi Compaction操作说明

什么是Compaction

Compaction用于合并mor表Base和Log文件,Compaction包含两个过程Schedule和Run。Schedule过程会在TimeLine里生成一个Compaction Plan,这个Compaction Plan会记录哪些parquet文件将会与哪些log文件进行合并,但是仅仅是一个Plan,没有去合并。Run过程会将TimeLine里的所有Compaction Plan一个一个去执行,一直到全部都执行完。

对于Merge-On-Read表,数据使用列式Parquet文件和行式Avro文件存储,更新被记录到增量文件,然后进行同步/异步compaction生成新版本的列式文件。Merge-On-Read表可减少数据摄入延迟,因而进行不阻塞摄入的异步Compaction很有意义。

如何执行Compaction

  1. 仅执行Schedule
    • Spark SQL(设置如下参数,写数据时触发)
      hoodie.compact.inline=true
      hoodie.schedule.compact.only.inline=true
      hoodie.run.compact.only.inline=false
      hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定

      随后执行任意写入SQL时,在满足条件后(同一个file slice下存在5个 delta log文件),会触发compaction。

    • Spark SQL(设置如下参数,手动触发1次)
      hoodie.compact.inline=true
      hoodie.schedule.compact.only.inline=true
      hoodie.run.compact.only.inline=false
      hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定

      随后手动执行SQL:

      schedule compaction on ${table_name}
    • SparkDataSource(option里设置如下参数,写数据时触发)

      hoodie.compact.inline=true

      hoodie.schedule.compact.only.inline=true

      hoodie.run.compact.only.inline=false

      hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定

    • Flink(with属性里设置如下参数,写数据时触发)

      compaction.async.enabled=false

      compaction.schedule.enabled=true

      compaction.delta_commits=5 // 默认值为5,根据业务场景指定

  2. 仅执行Run
    • Spark SQL(设置如下参数,手动触发1次)
      hoodie.compact.inline=true
      hoodie.schedule.compact.only.inline=false
      hoodie.run.compact.only.inline=true

      随后执行如下SQL

      run compaction on ${table_name}
  3. Schedule+Run同时执行

    如果TimeLine中没有Compaction Plan,就尝试生成一个Compaction Plan去执行。

    • Spark SQL(设置如下参数,随后执行任意写入SQL时,在满足条件时触发)
      hoodie.compact.inline=true
      hoodie.schedule.compact.only.inline=false
      hoodie.run.compact.only.inline=false
      hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定
    • SparkDataSource(option里设置如下参数,写数据时触发)

      hoodie.compact.inline=true

      hoodie.schedule.compact.only.inline=false

      hoodie.run.compact.only.inline=false

      hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指

    • Flink(with属性里设置如下参数,写数据时触发)

      compaction.async.enabled=true

      compaction.schedule.enabled=false

      compaction.delta_commits=5 // 默认值为5,根据业务场景指定

  4. 推荐方案
    • Spark/Flink流任务仅执行Schedule,然后另起一个Spark SQL任务定时仅执行Run。
    • Spark批任务可以直接同时执行Schedule + Run。

      为了保证入湖的最高效率,推荐使用同步产生compaction调度计划,异步执行compaction调度计划。

相关文档