更新时间:2026-06-11 GMT+08:00
分享

写入分布模式

基本语法

Iceberg的默认Spark写入器要求每个Spark Task中的数据需按分区值聚簇。这种分布方式可最大限度减少写入时保持打开的文件句柄数量。默认情况下会要求Spark对写入数据进行预排序,以适配该分布方式,通过表属性“write.distribution-mode”配置,默认值为“hash”。在Spark 3.5.0之前版本中,CTAS和RTAS操作不支持分布模式,Spark不会遵循“write.distribution-mode”配置。

以下示例为理解写入分布模式的作用:

创建表:

CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (days(ts), category);

向该表写入数据时,需确保数据按days(ts)和category排序(默认的hash分布模式会自动处理,无需手动排序)。例如:

INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table;

“write.distribution-mode”支持以下3种配置,分别对应不同的Spark数据分片和排序策略:

  • none(Iceberg 1.2.0之前版本默认值)
    • 核心逻辑:不要求Spark自动执行任何Shuffle(数据重分区)或排序操作。
    • 数据要求:需手动确保数据按分区值排序(排序可在单个Spark Task内完成,也可对全量数据集全局排序)。全局排序能最大限度减少输出文件数量。
    • 特殊处理:如果不想排序,可启用Spark的write fanout属性,但这会导致所有文件句柄保持打开状态,直到每个写入Task完成,可能增加资源占用。
    • 适用场景:数据已提前按分区值排序(例如上游任务输出已满足顺序),无需额外Shuffle开销的场景。
  • hash(Iceberg 1.2.0及之后版本默认值)
    • 核心逻辑:要求Spark通过基于哈希的交换对写入数据进行Shuffle,再执行写入。
    • 具体过程:对每一行数据,根据其分区值计算哈希值,再按哈希值将数据分配到对应的Spark Task中。如果启用Spark自适应查询计划,Task可能会进一步拆分或合并,以优化性能。
    • 优势:自动完成数据按分区值聚簇,无需手动排序,平衡性能与资源开销。
    • 适用场景:适用于大多数常规写入场景,尤其是数据未提前排序、需自动适配分区分布的情况。
  • range(范围分布)
    • 核心逻辑:要求Spark通过基于范围的交换对数据进行Shuffle,写入前会对数据全局排序。
    • 具体过程(两阶段):
      1. 采样阶段:基于分区列和排序列,对写入数据进行采样,确定数据的范围分布规则。
      2. Shuffle阶段:根据采样得到的范围信息,将数据分配到不同Spark Task,每个Task处理一段唯一的数据范围,最终实现 “按分区聚簇 + 全局排序”。
    • 特点:
      • 开销高于“hash”模式,即需额外采样和全局排序。
      • 全局排序可优化读性能:如果查询时常用排序列过滤或排序,预排序的数据能减少读取时的计算开销。
    • 默认触发:如果表创建时指定了sort-order(排序规则),“write.distribution-mode”会自动设置为“range”。
    • 适用场景:表有明确排序需求(如按时间列排序),且读性能优先级高于写入开销的场景。

相关文档