更新时间:2024-11-29 GMT+08:00

Clustering

Clustering简介

Clustering即数据布局,该服务可重新组织数据以提高查询性能,也不会影响摄取速度。

Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。

为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。

Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。

Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。

总体而言Clustering分为两个部分:

  • 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。
    1. 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。
    2. 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。
    3. 将Clustering计划以avro元数据格式保存到时间线。
  • 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。
    1. 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。
    2. 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。
    3. 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。

使用Clustering

  1. 同步执行Clustering配置。

    在写入时加上配置参数:

    option("hoodie.clustering.inline", "true").

    option("hoodie.clustering.inline.max.commits", "4").

    option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

    option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

    option("hoodie.clustering.plan.strategy.sort.columns", "column1,column2").

  2. 异步执行Clustering:

    通过spark-sql命令来执行clustering,具体可以参考CLUSTERING章节。

    异步执行clustering需要通过set命令设置以下参数:

    set hoodie.clustering.async.enabled = true;

    set hoodie.clustering.async.max.commits = 4;

  3. 指定clustering的排序方式和排序列:

    当前clustering支持linear、z-order、hilbert 三种排序方式,可以通过option方式或者set方式来设置。

    • linear:普通排序,默认排序,适合排序一个字段, 或者多个低级字段。
    • z-order和hilbert:多维排序,需要指定“hoodie.layout.optimize.strategy”为z-order或者hilbert。

      适合排序多个字段,例如查询条件中涉及到多个字段。推荐排序字段的个数2到4个。

      hilbert多维排序效果比z-order好,但是排序效率没z-order高。

详细配置请参考Hudi常用参数

  1. Clustering的排序列不允许值存在null,是Spark RDD的限制。
  2. 当target.file.max.bytes的值较大时,启动Clustering执行需要提高--spark-memory,否则会导致executor内存溢出。
  3. 当前Clean不支持清理Clustering失败后的垃圾文件。
  4. Clustering后可能出现新文件大小不等引起数据倾斜的情况。
  5. Cluster不支持和upsert并发。
  6. 如果Clustering处于inflight状态,该FileGroup下的文件不支持Update操作。
  7. 如果存在未完成的Clustering计划,后续写入触发生成compaction调度计划时会报错失败,需要及时执行Clustering计划。