更新时间:2025-12-10 GMT+08:00
分享

使用Spark向Iceberg表中写入数据

Iceberg采用Apache Spark的DataSourceV2 API实现数据源和目录写入功能,不同Spark版本中支持的DataSourceV2 API请参见表1,且部分功能仅在Spark 3中使用Iceberg SQL扩展时可用。

表1 Spark支持DataSourceV2 API介绍

功能

说明

使用要求

INSERT INTO

用于向表中追加新数据的SQL语句。

需要设置spark.sql.storeAssignmentPolicy=ANSI,Spark 3.0及之后版本默认为ANSI。

MERGE INTO

用于行级数据更新操作的SQL语句。

需要在Iceberg SQL扩展中使用。

INSERT OVERWRITE

用于使用查询结果替换表中的数据的SQL语句。

需要设置spark.sql.storeAssignmentPolicy=ANSI,Spark 3.0及之后版本默认为ANSI。

DELETE FROM

用于从表中删除满足条件的记录的SQL语句。

行级删除操作需要在Iceberg SQL扩展中使用。

UPDATE

用于更新查询通过过滤器匹配需要更新的行,并对这些行执行字段更新操作的SQL语句。

需要在Iceberg SQL扩展中使用。

DataFrame追加

可使用append将DataFrame追加到Iceberg表中。

-

DataFrame覆盖

可使用overwritePartitions()动态覆盖分区数据。

-

DataFrame CTAS和RTAS

可使用createreplacecreateOrReplace操作运行CTAS或RTAS,以执行创建和替换操作。

需要在DataSourceV2 API中使用。

使用SQL写入数据

Spark 3支持INSERT INTOMERGE INTOINSERT OVERWRITE的SQL写入语句,以及新的DataFrameWriterV2 API。

  • INSERT INTO

    可使用INSERT INTO向表中追加新数据:

    创建表:

    CREATE TABLE prod.db.table (id bigint, data string) USING iceberg;

    写入数据:

    • 示例一:
      INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b');
    • 示例二:
      INSERT INTO prod.db.table SELECT ...;
  • MERGE INTO

    Spark 3新增了MERGE INTO查询,可用于行级数据更新操作。

    Iceberg通过重写包含需要更新的行的数据文件(以覆盖提交方式)来支持MERGE INTO。推荐使用MERGE INTO而非INSERT OVERWRITE,因为Iceberg仅需替换受影响的数据文件,且动态覆盖所覆盖的数据可能会因表分区变更而改变。

    MERGE INTO通过另一个查询提供的更新集更新目标表,目标表中每行的更新通过ON子句匹配:

    MERGE INTO prod.db.target t
    USING (SELECT ...) s
    ON t.id = s.id
    WHEN ...;

    目标表中匹配的行可通过WHEN MATCHED ... THEN ... 定义更新操作,可添加多个带条件的MATCHED子句,仅第一个匹配的表达式会被执行,例如:

    • 示例一:
      WHEN MATCHED AND s.op = 'delete' THEN DELETE
    • 示例二:
      WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
    • 示例三:
      WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
    源数据中未匹配到目标表的行可通过WHEN NOT MATCHED插入到目标表:
    WHEN NOT MATCHED THEN INSERT *

    插入操作也支持附加条件:

    WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)

    源数据中最多只能有一条记录更新目标表对应的一行,否则会报错。

  • INSERT OVERWRITE

    INSERT OVERWRITE可使用查询结果替换表中的数据,对于Iceberg表,覆盖操作是原子性的。

    INSERT OVERWRITE会基于Spark的分区覆盖模式和表的分区方式替换对应分区。MERGE INTO仅重写受影响的数据文件,行为更易理解,因此推荐优先使用 MERGE INTO而非INSERT OVERWRITE

    • 覆盖行为

      Spark默认使用静态覆盖模式,但写入Iceberg表时推荐使用动态覆盖模式。静态覆盖模式通过将PARTITION子句转换为过滤器来确定要覆盖的分区,但 PARTITION子句只能引用表的列。

      动态覆盖模式可通过设置spark.sql.sources.partitionOverwriteMode=dynamic启用。

      以下示例为动态和静态覆盖不同行为:

      创建logs表,定义如下:

      CREATE TABLE prod.my_app.logs (uuid string NOT NULL,  level string NOT NULL, ts timestamp NOT NULL, message string) USING iceberg PARTITIONED BY (level, hours(ts));
      • 动态覆盖:

        当Spark的覆盖模式为动态时,查询结果中包含行的分区将被替换。

        例如,以下查询为从logs表中移除重复的日志事件:

        INSERT OVERWRITE prod.my_app.logs SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE cast(ts as date) = '2020-07-01' GROUP BY uuid;

        在动态模式下,此操作将仅替换查询结果中包含行的分区。由于所有行的日期都限制在7月1日,因此仅当天的小时分区会被替换。

      • 静态覆盖:

        当Spark的覆盖模式为静态时,PARTITION子句会被转换为过滤器,用于从表中删除数据。若省略PARTITION子句,所有分区都将被替换。

        在以上示例中,对于不含PARTITION子句的查询,在静态模式下会删除表中所有现有行,仅写入7月1日的日志。

        如果需要仅覆盖加载的分区,需要添加与SELECT查询过滤器匹配的PARTITION子句:

        INSERT OVERWRITE prod.my_app.logs PARTITION (level = 'INFO') SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE level = 'INFO' GROUP BY uuid;

        需注意,静态模式无法像动态模式替换小时级分区,因为PARTITION子句只能引用表的显式列,不能引用隐藏分区。

  • DELETE FROM

    Spark 3新增支持DELETE FROM查询,用于从表中删除数据。

    删除查询通过过滤器(WHERE子句)匹配需要删除的行,仅删除满足条件的记录,且操作对Iceberg表是原子性的。

    示例一:

    DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';

    示例二:

    DELETE FROM prod.db.all_events WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);

    示例三:

    DELETE FROM prod.db.orders AS t1 WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);

    如果删除过滤器匹配表的整个分区,Iceberg无需重写数据文件,仅通过更新元数据即可完成操作,性能极高。

    如果删除过滤器仅匹配表中的部分行,Iceberg会仅重写包含这些行的数据文件,即读取旧文件、过滤掉需删除的行、写入新文件,最后更新元数据指向新文件,最大限度减少数据处理量。

  • UPDATE

    UPDATE用于更新查询通过过滤器匹配需要更新的行,并对这些行执行字段更新操作。

    示例一:

    UPDATE prod.db.table SET c1 = 'update_c1', c2 = 'update_c2' WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';

    示例二:

    UPDATE prod.db.all_events SET session_time = 0, ignored = true WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);

    示例三:

    UPDATE prod.db.orders AS t1 SET order_status = 'returned' WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);

    对于基于输入数据的更复杂行级更新,可参考MERGE INTO,能更灵活地处理多条件组合的更新需求。

写入分支数据

在执行写入操作前,目标分支必须已存在,写入操作不会自动创建不存在的分支。创建分支操作可参考Branch和Tag DDL语法

  • 通过SQL语句写入分支

    分支写入可通过在操作中指定分支标识符branch_yourBranch实现。也可通过指定spark.wap.branch配置,在写入-审计-发布(WAP)工作流中执行分支写入。需注意,不能同时指定WAP分支和分支标识符。

    MERGE INTO分支数据:

    MERGE INTO prod.db.table.branch_audit t USING (SELECT ...) s  ON t.id = s.id  WHEN ...;

    UPDATE分支数据:

    UPDATE prod.db.table.branch_audit AS t1 SET val = 'c';

    删除分支数据:

    DELETE FROM prod.dbl.table.branch_audit WHERE id = 2;

    WAP写入分支:

    1. 配置spark.wap.branch
      SET spark.wap.branch = audit-branch;
    2. 插入数据:
      INSERT INTO prod.db.table VALUES (3, 'c');
  • 通过DataFrames写入分支

    通过DataFrames写入分支时,也需要在操作中指定分支标识符branch_yourBranch

    示例一:

    val data: DataFrame = ...
    data.writeTo("prod.db.table.branch_audit").append();

    示例二:

    val data: DataFrame = ...
    data.writeTo("prod.db.table.branch_audit").overwritePartitions();

使用DataFrames写入数据

Spark 3支持DataFrameWriterV2 API,用于通过DataFrames写入表。推荐使用v2 API,原因如下:

  • 支持CTAS、RTAS和按过滤器覆盖。
  • 所有操作始终按名称将列写入表。
  • 支持在partitionedBy中使用隐藏分区表达式。
  • 覆盖行为是显式的,要么是动态的,要么是用户提供的过滤器。
  • 每个操作的行为与SQL语句对应:
    • df.writeTo(t).create()对应CREATE TABLE AS SELECT
    • df.writeTo(t).replace()对应REPLACE TABLE AS SELECT
    • df.writeTo(t).append()对应INSERT INTO
    • df.writeTo(t).overwritePartitions() 等价于动态 INSERT OVERWRITE

v1 DataFrame写入API仍然支持,但不推荐使用。

  • 追加数据

    可使用append将DataFrame追加到Iceberg表中:

    val data: DataFrame = ...
    data.writeTo("prod.db.table").append()
  • 覆盖数据

    可使用overwritePartitions()动态覆盖分区:

    val data: DataFrame = ...
    data.writeTo("prod.db.table").overwritePartitions()

    使用overwrite并提供过滤器可显式覆盖分区:

    data.writeTo("prod.db.table").overwrite($"level" === "INFO")
  • 创建表

    可使用createreplacecreateOrReplace操作运行CTAS或RTAS:

    val data: DataFrame = ...
    data.writeTo("prod.db.table").create()

    如果已用Iceberg的SparkSessionCatalog替换默认的Spark目录(spark_catalog),请执行:

    val data: DataFrame = ...
    data.writeTo("db.table").using("iceberg").create()

    创建和替换操作支持表配置方法,如partitionedBy和tableProperty:

    data.writeTo("prod.db.table").tableProperty("write.format.default", "orc").partitionedBy($"level", days($"ts")).createOrReplace()

    也可以通过location表属性指定Iceberg表的位置:

    data.writeTo("prod.db.table").tableProperty("location", "/path/to/location").createOrReplace()
  • 模式合并

    在插入或更新数据时,Iceberg能够在运行时解决模式不匹配问题。如果配置正确,Iceberg将执行自动模式演进,具体如下:

    • 源表中存在新列但目标表中不存在:新列将添加到目标表,表中已存在的所有行的该列值将设置为NULL。
    • 目标表中存在某列但源表中不存在:插入数据时,目标列值将设置为NULL;更新行时,目标列值保持不变。

    要启用此功能,目标表必须配置“write.spark.accept-any-schema”为“true”为接受任何模式更改:

    ALTER TABLE prod.db.sample SET TBLPROPERTIES ('write.spark.accept-any-schema'='true');

    写入器必须启用mergeSchema选项:

    data.writeTo("prod.db.sample").option("mergeSchema","true").append()

写入分布模式

Iceberg的默认Spark写入器要求每个Spark Task中的数据需按分区值聚簇。这种分布方式可最大限度减少写入时保持打开的文件句柄数量。Iceberg 1.2.0及之后版本,默认情况下还会要求Spark对写入数据进行预排序,以适配该分布方式,通过表属性“write.distribution-mode”配置,默认值为“hash”。在Spark 3.5.0之前版本中,CTAS和RTAS操作不支持分布模式,Spark不会遵循“write.distribution-mode”配置。

以下示例为理解写入分布模式的作用:

创建表:

CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (days(ts), category);

向该表写入数据时,需确保数据按days(ts)和category排序(默认的hash分布模式会自动处理,无需手动排序)。例如:

INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table;

“write.distribution-mode”支持以下3种配置,分别对应不同的Spark数据分片和排序策略:

  • none(Iceberg 1.2.0之前版本默认值)
    • 核心逻辑:不要求Spark自动执行任何Shuffle(数据重分区)或排序操作。
    • 数据要求:需手动确保数据按分区值排序(排序可在单个Spark Task内完成,也可对全量数据集全局排序)。全局排序能最大限度减少输出文件数量。
    • 特殊处理:如果不想排序,可启用Spark的write fanout属性,但这会导致所有文件句柄保持打开状态,直到每个写入Task完成,可能增加资源占用。
    • 适用场景:数据已提前按分区值排序(例如上游任务输出已满足顺序),无需额外Shuffle开销的场景。
  • hash(Iceberg 1.2.0及之后版本默认值)
    • 核心逻辑:要求Spark通过基于哈希的交换对写入数据进行Shuffle,再执行写入。
    • 具体过程:对每一行数据,根据其分区值计算哈希值,再按哈希值将数据分配到对应的Spark Task中。如果启用Spark自适应查询计划,Task可能会进一步拆分或合并,以优化性能。
    • 优势:自动完成数据按分区值聚簇,无需手动排序,平衡性能与资源开销。
    • 适用场景:适用于大多数常规写入场景,尤其是数据未提前排序、需自动适配分区分布的情况。
  • range(范围分布)
    • 核心逻辑:要求Spark通过基于范围的交换对数据进行Shuffle,写入前会对数据全局排序。
    • 具体过程(两阶段):
      1. 采样阶段:基于分区列和排序列,对写入数据进行采样,确定数据的范围分布规则。
      2. Shuffle阶段:根据采样得到的范围信息,将数据分配到不同Spark Task,每个Task处理一段唯一的数据范围,最终实现 “按分区聚簇 + 全局排序”。
    • 特点:
      • 开销高于“hash”模式,即需额外采样和全局排序。
      • 全局排序可优化读性能:如果查询时常用排序列过滤或排序,预排序的数据能减少读取时的计算开销。
    • 默认触发:如果表创建时指定了sort-order(排序规则),“write.distribution-mode”会自动设置为“range”。
    • 适用场景:表有明确排序需求(如按时间列排序),且读性能优先级高于写入开销的场景。

控制文件大小

使用Spark向Iceberg写入数据时,需注意:

  • Spark无法写出比单个Spark Task数据量更大的文件,且单个文件不能跨越Iceberg的分区边界。意味着尽管当文件大小达到目标文件大小配置“write.target-file-size-bytes”的值时,Iceberg总会自动滚动生成新文件;但如果Spark Task的数据量未达到该阈值,文件滚动就不会触发。
  • 磁盘上生成的文件大小会远小于Spark Task的数据量。因为磁盘数据采用压缩的列存储格式,而Spark中的数据则是未压缩的行存储格式。例如,一个数据量为100 MB的Spark Task,即便仅向单个Iceberg分区写入数据,生成的磁盘文件也会远小于100 MB;如果该Task需向多个分区写入数据,生成的单个文件会更小。

如果要控制每个Spark Task中的数据内容,可使用写入分布模式或手动对数据进行重分区。

如果要调整Spark Task的大小,需熟悉Spark的各类自适应查询执行(Adaptive Query Execution,即AQE)参数。当“write.distribution-mode”配置不为“none”时,在数据交换过程中,AQE会控制Spark Task的合并与拆分,使得生成大小符合“spark.sql.adaptive.advisoryPartitionSizeInBytes”配置的Task。这些配置同样会影响用户手动执行的重分区或排序操作。还需注意,该参数定义的是Spark内存中行数据的大小,而非磁盘上列存储压缩后的文件大小,因此需将其配置为比目标文件大小更大的值,内存数据大小与磁盘文件大小的比例取决于数据本身的特性。

相关文档