更新时间:2024-12-25 GMT+08:00
分享

Hudi Clustering操作说明

什么是Clustering

即数据布局,该服务可重新组织数据以提高查询性能,也不会影响摄取速度。

Clustering架构

Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。

为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。

Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。

Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。

总体而言Clustering分为两个部分:

  • 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。
    1. 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。
    2. 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。
    3. 将Clustering计划以avro元数据格式保存到时间线。
  • 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。
    1. 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。
    2. 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。
    3. 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。

如何执行Clustering

  1. Spark SQL(设置如下参数,写数据时触发)
    hoodie.clustering.inline=true  // 默认值 false,即默认为关闭状态
    hoodie.clustering.inline.max.commits=4 // 默认值为4,根据业务场景指定
    hoodie.clustering.plan.strategy.max.bytes.per.group=2147483648 // 默认值为2G,根据业务场景指定。一般不需要指定,因为正常每个file group下的数据量不会超过2G
    hoodie.clustering.plan.strategy.max.num.groups=30 // 默认值为30,根据业务场景指定。一般通过调整这个参数来调整每次Clustering计划合并的数据量(max.bytes.per.group * max.num.groups)。
    hoodie.clustering.plan.strategy.partition.regex.pattern=${正则表达式} // 无默认值,不指定该参数的时候Clustering会对所有分区下的数据进行重组。
    hoodie.clustering.plan.strategy.small.file.limit=314572800 // 默认值是300M,根据业务场景指定。每个分区下,小于300M的文件会被筛选出来做Clustering。
    hoodie.clustering.plan.strategy.sort.columns=${排序列1,......,排序列n} // 无默认值,根据业务场景指定。指定为查询业务经常使用且不包含null的列。
    hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 // 默认值为1G,根据业务场景指定。用来设置Clustering产生的文件的最大size。
  2. SparkDataSource(option里设置如下参数,写数据时触发)

    option("hoodie.clustering.inline", "true").

    option("hoodie.clustering.inline.max.commits", 4).

    option("hoodie.clustering.plan.strategy.max.bytes.per.group", 2147483648).

    option("hoodie.clustering.plan.strategy.max.num.groups", 30).

    option("hoodie.clustering.plan.strategy.partition.regex.pattern", "${正则表达式}").

    option("hoodie.clustering.plan.strategy.small.file.limit", 314572800).

    option("hoodie.clustering.plan.strategy.sort.columns", "${排序列1,......,排序列n}").

    option("hoodie.clustering.plan.strategy.target.file.max.bytes", 1073741824).

  3. 手动触发1次Clustering
    • Spark SQL(设置如下参数,手动触发1次)
    hoodie.clustering.inline=true
    hoodie.clustering.inline.max.commits=4 // 默认值为4,根据业务场景指定
    hoodie.clustering.plan.strategy.max.bytes.per.group=2147483648 // 默认值为2G,根据业务场景指定。一般不需要指定,因为正常每个file group下的数据量不会超过2G
    hoodie.clustering.plan.strategy.max.num.groups=30 // 默认值为30,根据业务场景指定。一般通过调整这个参数来调整每次Clustering计划合并的数据量(max.bytes.per.group * max.num.groups)。
    hoodie.clustering.plan.strategy.partition.regex.pattern=${正则表达式} // 无默认值,不指定该参数的时候Clustering会对所有分区下的数据进行重组。
    hoodie.clustering.plan.strategy.small.file.limit=314572800 // 默认值是300M,根据业务场景指定。每个分区下,小于300M的文件会被筛选出来做Clustering。
    hoodie.clustering.plan.strategy.sort.columns=${排序列1,......,排序列n} // 无默认值,根据业务场景指定。指定为查询业务经常使用且不包含null的列。
    hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 // 默认值为1G,根据业务场景指定。用来设置Clustering产生的文件的最大size。

    随后执行SQL

    call run_clustering(table => '${表名}')
  1. Clustering的排序列不允许值存在null,这是Spark RDD的限制。
  2. 当target.file.max.bytes的值较大时,启动Clustering执行需要提高--executor-memory,否则会导致executor内存溢出。
  3. Clean不支持清理Clustering失败后的残留文件。
  4. Clustering后产生的新文件大小不等,这可能引起数据倾斜。
  5. Clustering不支持和Upsert(写操作更新待Clustering的文件)并发,如果Clustering处于inflight状态,该FileGroup下的文件不支持被更新。
  6. 如果存在未完成的Clustering计划,后续写入触发生成Compaction调度计划时会报错失败,需要及时执行Clustering计划。

相关文档