更新时间:2025-09-04 GMT+08:00
分享

MRS Kafka到MRS Hudi参数调优

源端优化

Kafka抽取优化。

可通过在源端配置中单击“Kafka源端属性配置”来添加Kafka优化配置。

图1 添加自定义属性

可使用的调优参数具体如下:

表1 全量阶段优化参数

参数名

类型

默认值

说明

properties.fetch.max.bytes

int

57671680

消费Kafka时每次fetch请求返回的最大字节数。Kafka单条消息大的场景,可以适当调高每次获取的数据量,以提高性能。

properties.max.partition.fetch.bytes

int

1048576

消费Kafka时服务器将返回的每个分区的最大字节数。Kafka单条消息大的场景,可以适当调高每次获取的数据量,以提高性能。

properties.max.poll.records

int

500

消费者每次poll时返回的最大消息条数。Kafka单条消息大的场景,可以适当调高每次获取的数据量,以提高性能。

目的端优化

Hudi写入优化。

Hudi表写入性能慢,优先审视表设计是否合理,建议使用Hudi Bucket索引的MOR表,并根据实际数据量配置Bucket桶数,以达到Migration写入性能最佳。

  • 使用Bucket索引:通过在“Hudi表属性全局配置”或在映射后的单表“表属性编辑”中配置index.type和hoodie.bucket.index.num.buckets属性可进行配置。
  • 判断使用分区表还是非分区表。

    根据表的使用场景一般将表分为事实表和维度表:

    • 事实表通常整表数据规模较大,以新增数据为主,更新数据占比小,且更新数据大多落在近一段时间范围内(年或月或天),下游读取该表进行ETL计算时通常会使用时间范围进行裁剪(例如最近一天、一月、一年),这种表通常可以通过数据的创建时间来做分区以保证最佳读写性能。
    • 维度表数据量一般整表数据规模较小,以更新数据为主,新增较少,表数据量比较稳定,且读取时通常需要全量读取做join之类的ETL计算,因此通常使用非分区表性能更好。
  • 确认表内桶数。
    使用Hudi BUCKET表时需要设置Bucket桶数,桶数设置关系到表的性能,需要格外引起注意。
    • 非分区表桶数 = MAX(CEIL(单表数据量大小(GB)/1GB), 4)。
    • 分区表桶数 = MAX(CEIL(单分区数据量大小(GB)/1GB), 1)。

    其中,要注意的是:

    • 需要使用的是表的总数据大小,而不是压缩以后的文件大小。
    • 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。

同时,可通过在Hudi的目的端配置中单击“Hudi表属性全局配置”或在映射后的单表“表属性编辑”中,添加优化参数。

图2 添加自定义属性
表2 Hudi写入优化参数

参数名

类型

默认值

说明

hoodie.sink.flush.tasks

int

1

Hudi flush数据时的并发数,默认为1,即顺序写入。当Hud单次commit涉及FleGroup较多时(如源端表较多更新历史数据的场景),考虑增大该值。

已知单线程flush的FileGroup的数据 = 单次Commit的FileGroup数量 / 作业并发数。

单线程flush的FileGroup的数量 <= 5,推荐值2。

单线程flush的FileGroup的数量 <= 10,推荐值5。

单线程flush的FileGroup的数量 <= 25,推荐值10。

单线程flush的FileGroup的数量 <= 50,推荐值20。

单线程flush的FileGroup的数量 > 50,推荐值30。

flush的并发数越大,flush时内存会相应升高,请结合实时处理集成作业内存监控适当调整该值。

hoodie.context.flatmap.parallelism

int

1

Hudi在commit时,会进行分区扫描操作,默认是单并发操作,当Hudi单次commit涉及的分区较多时,考虑增大该值以提升commit速度。

单次Commit的分区数量 <= 10,推荐值5。

单次Commit的分区数量 <= 25,推荐值10。

单次Commit的分区数量 <= 50,推荐值20。

单次Commit的分区数量 > 50,推荐值30。

compaction.async.enabled

boolean

true

是否开启compaction,默认为true,即默认开启hudi的compaction操作。compaction操作一定程度会影响实时任务的写入性能,为了保证Migration作业的稳定性可以考虑设置为false关闭compaction操作,将Hudi Compaction单独拆成Spark作业交由MRS执行,具体可以参考如何配置Hudi Compaction的Spark周期任务?

compaction.delta_commits

int

5

实时处理集成生成compaction request的频率,默认为5时,即每5次commit生成一个compaction request。compaction request生成频率降低可以使得compaction频率降低从而提升作业性能。如果hudi增量数据较小。可以考虑增大该值。

相关文档