Hudi数据表Compaction规范
mor表更新数据以行存log的形式写入,log读取时需要按主键合并,并且是行存的,导致log读取效率比parquet低很多。为了解决log读取的性能问题,Hudi通过compaction将log压缩成parquet文件,大幅提升读取性能。
规则
- 有数据持续写入的表,24小时内至少执行一次compaction。
对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题:
- Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。
- 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。
- 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。
- CPU与内存比例为1:4~1:8。
Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。
建议
- 通过增加并发数提升Compaction性能。
CPU和内存比例配置合理会保证Compaction作业是稳定的,实现单个Compaction task的稳定运行。但是Compaction整体的运行时长取决于本次Compaction处理文件数以及分配的cpu核数(并发能力),因此可以通过增加Compaciton作业的CPU核的个数来提升Compaction性能(注意增加cpu也要保证CPU与内存的比例)。
- Hudi表采用异步Compaction。
为了保证流式入库作业的稳定运行,就需要保证流式作业不在实时入库的过程中做其它任务,比如Flink写Hudi的同时会做Compaction。这看似是一个不错的方案,即完成了入库又完成Compaction。但是Compaction操作是非常消耗内存和IO的,它会给流式入库作业带来以下影响:
- 增加端到端时延:Compaction会放大写入时延,因为Compaction比入库更耗时。
- 作业不稳定:Compaction会给入库作业带来更多的不稳定性,Compaction OOM将会导致整个作业直接失败。
- 建议2~4小时进行一次compaction。
Compaction是MOR表非常重要且必须执行的维护手段,对于实时任务来说,要求Compaction执行合并的过程必须和实时任务解耦,通过周期调度Spark任务来完成异步Compaction,这个方案的关键之处在于如何合理的设置这个周期,周期如果太短意味着Spark任务可能会空跑,周期如果太长可能会积压太多的Compaction Plan没有去执行而导致Spark任务耗时长并且也会导致下游的读作业时延高。对此场景,在这里给出以下建议:按照集群资源使用情况,可以每2小时或每4个小时去调度执行一次异步Compaction作业,这是一个基本的维护MOR表的方案。
- 采用Spark异步执行Compaction,不采用Flink进行Compaction。
Flink写hudi建议的方案是Flink只负责写数据和生成Compaction计划,由单独的Spark作业异步执行compaction、clean和archive。Compaction计划的生成是轻量级的对Flink写入作业影响可以忽略。
上述方案落地的具体步骤参考如下:
- Flink只负责写数据和生成Compaction计划
Flink流任务建表语句中添加如下参数,控制Flink任务写Hudi时只会生成Compaction plan
'compaction.async.enabled' = 'false' -- 关闭Flink 执行Compaction任务 'compaction.schedule.enabled' = 'true' -- 开启Compaction计划生成 'compaction.delta_commits' = '5' -- MOR表默认5次checkpoint尝试生成compaction plan,该参数需要根据具体业务调整 'clean.async.enabled' = 'false' -- 关闭Clean操作 'hoodie.archive.automatic' = 'false' -- 关闭Archive操作
- Spark离线完成Compaction计划的执行,以及Clean和Archive操作
在调度平台(可以使用华为的DataArts)运行一个定时调度的离线任务来让Spark完成Hudi表的Compaction计划执行以及Clean和Archive操作。
set hoodie.archive.automatic = false; set hoodie.clean.automatic = false; set hoodie.compact.inline = true; set hoodie.run.compact.only.inline=true; set hoodie.cleaner.commits.retained = 500; -- clean保留timeline上最新的500个deltacommit对应的数据文件,之前的deltacommit所对应的旧版本文件会被清理。该值需要大于compaction.delta_commits设置的值,需要根据具体业务调整。 set hoodie.keep.max.commits = 700; -- timeline最多保留700个deltacommit set hoodie.keep.min.commits = 501; -- timeline最少保留500个deltacommit。该值需要大于hoodie.cleaner.commits.retained设置的值,需要根据具体业务调整。 run compaction on <database name>. <table name>; -- 执行Compaction计划 run clean on <database name>. <table name>; -- 执行Clean操作 run archivelog on <database name>.<table name>; -- 执行Archive操作
- Flink只负责写数据和生成Compaction计划
- 异步Compaction可以将多个表串行到一个作业,资源配置相近的表放到一组,该组作业的资源配置为最大消耗资源的表所需的资源
对于在•Hudi表采用异步Compaction和•采用Spark异步执行Compaction,不...中提到的异步Compaction任务,这里给出以下开发建议:
- 不需要对每张Hudi表都开发异步Compaction任务,这样会导致作业开发成本高,集群作业爆炸,集群资源不能有效的利用和释放。
- 异步Compaction任务可以通过执行SparkSQL来完成,多个Hudi表的Compaction、Clean和Archive可以放在同一个任务来执行,比如对table1和table2用同一个任务来执行异步维护操作:
set hoodie.clean.async = true; set hoodie.clean.automatic = false; set hoodie.compact.inline = true; set hoodie.run.compact.only.inline=true; set hoodie.cleaner.commits.retained = 500; set hoodie.keep.min.commits = 501; set hoodie.keep.max.commits = 700; run compaction on <database name>. <table1>; run clean on <database name>. <table1>; run archivelog on <database name>.<table1>; run compaction on <database name>.<table2>; run clean on <database name>.<table2>; run archivelog on <database name>.<table2>;