Clustering
Clustering简介
Clustering即数据布局,该服务可重新组织数据以提高查询性能,也不会影响摄取速度。
Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。
为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。
Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。
Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。
总体而言Clustering分为两个部分:
- 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。
- 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。
- 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。
- 将Clustering计划以avro元数据格式保存到时间线。
- 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。
- 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。
- 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。
- 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。
使用Clustering
- 同步执行Clustering配置。
option("hoodie.clustering.inline", "true").
option("hoodie.clustering.inline.max.commits", "4").
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
option("hoodie.clustering.plan.strategy.sort.columns", "column1,column2").
- 异步执行Clustering:
通过spark-sql命令来执行clustering,具体可以参考CLUSTERING章节。
异步执行clustering需要通过set命令设置以下参数:
set hoodie.clustering.async.enabled = true;
set hoodie.clustering.async.max.commits = 4;
- 指定clustering的排序方式和排序列:
当前clustering支持linear、z-order、hilbert 三种排序方式,可以通过option方式或者set方式来设置。
详细配置请参考Hudi常用参数。
- Clustering的排序列不允许值存在null,是Spark RDD的限制。
- 当target.file.max.bytes的值较大时,启动Clustering执行需要提高--spark-memory,否则会导致executor内存溢出。
- 当前Clean不支持清理Clustering失败后的垃圾文件。
- Clustering后可能出现新文件大小不等引起数据倾斜的情况。
- Cluster不支持和upsert并发。
- 如果Clustering处于inflight状态,该FileGroup下的文件不支持Update操作。
- 如果存在未完成的Clustering计划,后续写入触发生成compaction调度计划时会报错失败,需要及时执行Clustering计划。