使用Spark向Iceberg表中写入数据
Iceberg采用Apache Spark的DataSourceV2 API实现数据源和目录写入功能,不同Spark版本中支持的DataSourceV2 API请参见表1,且部分功能仅在Spark 3中使用Iceberg SQL扩展时可用。
|
功能 |
说明 |
使用要求 |
|---|---|---|
|
INSERT INTO |
用于向表中追加新数据的SQL语句。 |
需要设置spark.sql.storeAssignmentPolicy=ANSI,Spark 3.0及之后版本默认为ANSI。 |
|
MERGE INTO |
用于行级数据更新操作的SQL语句。 |
需要在Iceberg SQL扩展中使用。 |
|
INSERT OVERWRITE |
用于使用查询结果替换表中的数据的SQL语句。 |
需要设置spark.sql.storeAssignmentPolicy=ANSI,Spark 3.0及之后版本默认为ANSI。 |
|
DELETE FROM |
用于从表中删除满足条件的记录的SQL语句。 |
行级删除操作需要在Iceberg SQL扩展中使用。 |
|
UPDATE |
用于更新查询通过过滤器匹配需要更新的行,并对这些行执行字段更新操作的SQL语句。 |
需要在Iceberg SQL扩展中使用。 |
|
DataFrame追加 |
可使用append将DataFrame追加到Iceberg表中。 |
- |
|
DataFrame覆盖 |
可使用overwritePartitions()动态覆盖分区数据。 |
- |
|
DataFrame CTAS和RTAS |
可使用create、replace或createOrReplace操作运行CTAS或RTAS,以执行创建和替换操作。 |
需要在DataSourceV2 API中使用。 |
使用SQL写入数据
Spark 3支持INSERT INTO、MERGE INTO、INSERT OVERWRITE的SQL写入语句,以及新的DataFrameWriterV2 API。
- INSERT INTO
创建表:
CREATE TABLE prod.db.table (id bigint, data string) USING iceberg;
写入数据:
- 示例一:
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b');
- 示例二:
INSERT INTO prod.db.table SELECT ...;
- 示例一:
- MERGE INTO
Spark 3新增了MERGE INTO查询,可用于行级数据更新操作。
Iceberg通过重写包含需要更新的行的数据文件(以覆盖提交方式)来支持MERGE INTO。推荐使用MERGE INTO而非INSERT OVERWRITE,因为Iceberg仅需替换受影响的数据文件,且动态覆盖所覆盖的数据可能会因表分区变更而改变。
MERGE INTO通过另一个查询提供的更新集更新目标表,目标表中每行的更新通过ON子句匹配:
MERGE INTO prod.db.target t USING (SELECT ...) s ON t.id = s.id WHEN ...;
目标表中匹配的行可通过WHEN MATCHED ... THEN ... 定义更新操作,可添加多个带条件的MATCHED子句,仅第一个匹配的表达式会被执行,例如:
- 示例一:
WHEN MATCHED AND s.op = 'delete' THEN DELETE
- 示例二:
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
- 示例三:
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
源数据中未匹配到目标表的行可通过WHEN NOT MATCHED插入到目标表:WHEN NOT MATCHED THEN INSERT *
插入操作也支持附加条件:
WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)
源数据中最多只能有一条记录更新目标表对应的一行,否则会报错。
- 示例一:
- INSERT OVERWRITE
INSERT OVERWRITE可使用查询结果替换表中的数据,对于Iceberg表,覆盖操作是原子性的。
INSERT OVERWRITE会基于Spark的分区覆盖模式和表的分区方式替换对应分区。MERGE INTO仅重写受影响的数据文件,行为更易理解,因此推荐优先使用 MERGE INTO而非INSERT OVERWRITE。
- 覆盖行为
Spark默认使用静态覆盖模式,但写入Iceberg表时推荐使用动态覆盖模式。静态覆盖模式通过将PARTITION子句转换为过滤器来确定要覆盖的分区,但 PARTITION子句只能引用表的列。
动态覆盖模式可通过设置spark.sql.sources.partitionOverwriteMode=dynamic启用。
以下示例为动态和静态覆盖不同行为:
创建logs表,定义如下:
CREATE TABLE prod.my_app.logs (uuid string NOT NULL, level string NOT NULL, ts timestamp NOT NULL, message string) USING iceberg PARTITIONED BY (level, hours(ts));
- 动态覆盖:
当Spark的覆盖模式为动态时,查询结果中包含行的分区将被替换。
例如,以下查询为从logs表中移除重复的日志事件:
INSERT OVERWRITE prod.my_app.logs SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE cast(ts as date) = '2020-07-01' GROUP BY uuid;
在动态模式下,此操作将仅替换查询结果中包含行的分区。由于所有行的日期都限制在7月1日,因此仅当天的小时分区会被替换。
- 静态覆盖:
当Spark的覆盖模式为静态时,PARTITION子句会被转换为过滤器,用于从表中删除数据。若省略PARTITION子句,所有分区都将被替换。
在以上示例中,对于不含PARTITION子句的查询,在静态模式下会删除表中所有现有行,仅写入7月1日的日志。
如果需要仅覆盖加载的分区,需要添加与SELECT查询过滤器匹配的PARTITION子句:
INSERT OVERWRITE prod.my_app.logs PARTITION (level = 'INFO') SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE level = 'INFO' GROUP BY uuid;
需注意,静态模式无法像动态模式替换小时级分区,因为PARTITION子句只能引用表的显式列,不能引用隐藏分区。
- 动态覆盖:
- 覆盖行为
- DELETE FROM
Spark 3新增支持DELETE FROM查询,用于从表中删除数据。
删除查询通过过滤器(WHERE子句)匹配需要删除的行,仅删除满足条件的记录,且操作对Iceberg表是原子性的。
示例一:
DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';
示例二:
DELETE FROM prod.db.all_events WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);
示例三:
DELETE FROM prod.db.orders AS t1 WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);
如果删除过滤器匹配表的整个分区,Iceberg无需重写数据文件,仅通过更新元数据即可完成操作,性能极高。
如果删除过滤器仅匹配表中的部分行,Iceberg会仅重写包含这些行的数据文件,即读取旧文件、过滤掉需删除的行、写入新文件,最后更新元数据指向新文件,最大限度减少数据处理量。
- UPDATE
UPDATE用于更新查询通过过滤器匹配需要更新的行,并对这些行执行字段更新操作。
示例一:
UPDATE prod.db.table SET c1 = 'update_c1', c2 = 'update_c2' WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';
示例二:
UPDATE prod.db.all_events SET session_time = 0, ignored = true WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);
示例三:
UPDATE prod.db.orders AS t1 SET order_status = 'returned' WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);
对于基于输入数据的更复杂行级更新,可参考MERGE INTO,能更灵活地处理多条件组合的更新需求。
写入分支数据
在执行写入操作前,目标分支必须已存在,写入操作不会自动创建不存在的分支。创建分支操作可参考Branch和Tag DDL语法。
- 通过SQL语句写入分支
分支写入可通过在操作中指定分支标识符branch_yourBranch实现。也可通过指定spark.wap.branch配置,在写入-审计-发布(WAP)工作流中执行分支写入。需注意,不能同时指定WAP分支和分支标识符。
MERGE INTO分支数据:
MERGE INTO prod.db.table.branch_audit t USING (SELECT ...) s ON t.id = s.id WHEN ...;
UPDATE分支数据:
UPDATE prod.db.table.branch_audit AS t1 SET val = 'c';
删除分支数据:
DELETE FROM prod.dbl.table.branch_audit WHERE id = 2;
WAP写入分支:
- 配置spark.wap.branch:
SET spark.wap.branch = audit-branch;
- 插入数据:
INSERT INTO prod.db.table VALUES (3, 'c');
- 配置spark.wap.branch:
- 通过DataFrames写入分支
通过DataFrames写入分支时,也需要在操作中指定分支标识符branch_yourBranch。
示例一:
val data: DataFrame = ... data.writeTo("prod.db.table.branch_audit").append();示例二:
val data: DataFrame = ... data.writeTo("prod.db.table.branch_audit").overwritePartitions();
使用DataFrames写入数据
Spark 3支持DataFrameWriterV2 API,用于通过DataFrames写入表。推荐使用v2 API,原因如下:
- 支持CTAS、RTAS和按过滤器覆盖。
- 所有操作始终按名称将列写入表。
- 支持在partitionedBy中使用隐藏分区表达式。
- 覆盖行为是显式的,要么是动态的,要么是用户提供的过滤器。
- 每个操作的行为与SQL语句对应:
- df.writeTo(t).create()对应CREATE TABLE AS SELECT。
- df.writeTo(t).replace()对应REPLACE TABLE AS SELECT。
- df.writeTo(t).append()对应INSERT INTO。
- df.writeTo(t).overwritePartitions() 等价于动态 INSERT OVERWRITE
v1 DataFrame写入API仍然支持,但不推荐使用。
- 追加数据
可使用append将DataFrame追加到Iceberg表中:
val data: DataFrame = ... data.writeTo("prod.db.table").append() - 覆盖数据
可使用overwritePartitions()动态覆盖分区:
val data: DataFrame = ... data.writeTo("prod.db.table").overwritePartitions()使用overwrite并提供过滤器可显式覆盖分区:
data.writeTo("prod.db.table").overwrite($"level" === "INFO") - 创建表
可使用create、replace或createOrReplace操作运行CTAS或RTAS:
val data: DataFrame = ... data.writeTo("prod.db.table").create()如果已用Iceberg的SparkSessionCatalog替换默认的Spark目录(spark_catalog),请执行:
val data: DataFrame = ... data.writeTo("db.table").using("iceberg").create()创建和替换操作支持表配置方法,如partitionedBy和tableProperty:
data.writeTo("prod.db.table").tableProperty("write.format.default", "orc").partitionedBy($"level", days($"ts")).createOrReplace()也可以通过location表属性指定Iceberg表的位置:
data.writeTo("prod.db.table").tableProperty("location", "/path/to/location").createOrReplace() - 模式合并
在插入或更新数据时,Iceberg能够在运行时解决模式不匹配问题。如果配置正确,Iceberg将执行自动模式演进,具体如下:
- 源表中存在新列但目标表中不存在:新列将添加到目标表,表中已存在的所有行的该列值将设置为NULL。
- 目标表中存在某列但源表中不存在:插入数据时,目标列值将设置为NULL;更新行时,目标列值保持不变。
要启用此功能,目标表必须配置“write.spark.accept-any-schema”为“true”为接受任何模式更改:
ALTER TABLE prod.db.sample SET TBLPROPERTIES ('write.spark.accept-any-schema'='true');写入器必须启用mergeSchema选项:
data.writeTo("prod.db.sample").option("mergeSchema","true").append()
写入分布模式
Iceberg的默认Spark写入器要求每个Spark Task中的数据需按分区值聚簇。这种分布方式可最大限度减少写入时保持打开的文件句柄数量。Iceberg 1.2.0及之后版本,默认情况下还会要求Spark对写入数据进行预排序,以适配该分布方式,通过表属性“write.distribution-mode”配置,默认值为“hash”。在Spark 3.5.0之前版本中,CTAS和RTAS操作不支持分布模式,Spark不会遵循“write.distribution-mode”配置。
以下示例为理解写入分布模式的作用:
创建表:
CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (days(ts), category);
向该表写入数据时,需确保数据按days(ts)和category排序(默认的hash分布模式会自动处理,无需手动排序)。例如:
INSERT INTO prod.db.sample SELECT id, data, category, ts FROM another_table;
“write.distribution-mode”支持以下3种配置,分别对应不同的Spark数据分片和排序策略:
- none(Iceberg 1.2.0之前版本默认值)
- 核心逻辑:不要求Spark自动执行任何Shuffle(数据重分区)或排序操作。
- 数据要求:需手动确保数据按分区值排序(排序可在单个Spark Task内完成,也可对全量数据集全局排序)。全局排序能最大限度减少输出文件数量。
- 特殊处理:如果不想排序,可启用Spark的write fanout属性,但这会导致所有文件句柄保持打开状态,直到每个写入Task完成,可能增加资源占用。
- 适用场景:数据已提前按分区值排序(例如上游任务输出已满足顺序),无需额外Shuffle开销的场景。
- hash(Iceberg 1.2.0及之后版本默认值)
- 核心逻辑:要求Spark通过基于哈希的交换对写入数据进行Shuffle,再执行写入。
- 具体过程:对每一行数据,根据其分区值计算哈希值,再按哈希值将数据分配到对应的Spark Task中。如果启用Spark自适应查询计划,Task可能会进一步拆分或合并,以优化性能。
- 优势:自动完成数据按分区值聚簇,无需手动排序,平衡性能与资源开销。
- 适用场景:适用于大多数常规写入场景,尤其是数据未提前排序、需自动适配分区分布的情况。
- range(范围分布)
- 核心逻辑:要求Spark通过基于范围的交换对数据进行Shuffle,写入前会对数据全局排序。
- 具体过程(两阶段):
- 采样阶段:基于分区列和排序列,对写入数据进行采样,确定数据的范围分布规则。
- Shuffle阶段:根据采样得到的范围信息,将数据分配到不同Spark Task,每个Task处理一段唯一的数据范围,最终实现 “按分区聚簇 + 全局排序”。
- 特点:
- 开销高于“hash”模式,即需额外采样和全局排序。
- 全局排序可优化读性能:如果查询时常用排序列过滤或排序,预排序的数据能减少读取时的计算开销。
- 默认触发:如果表创建时指定了sort-order(排序规则),“write.distribution-mode”会自动设置为“range”。
- 适用场景:表有明确排序需求(如按时间列排序),且读性能优先级高于写入开销的场景。
控制文件大小
使用Spark向Iceberg写入数据时,需注意:
- Spark无法写出比单个Spark Task数据量更大的文件,且单个文件不能跨越Iceberg的分区边界。意味着尽管当文件大小达到目标文件大小配置“write.target-file-size-bytes”的值时,Iceberg总会自动滚动生成新文件;但如果Spark Task的数据量未达到该阈值,文件滚动就不会触发。
- 磁盘上生成的文件大小会远小于Spark Task的数据量。因为磁盘数据采用压缩的列存储格式,而Spark中的数据则是未压缩的行存储格式。例如,一个数据量为100 MB的Spark Task,即便仅向单个Iceberg分区写入数据,生成的磁盘文件也会远小于100 MB;如果该Task需向多个分区写入数据,生成的单个文件会更小。
如果要控制每个Spark Task中的数据内容,可使用写入分布模式或手动对数据进行重分区。
如果要调整Spark Task的大小,需熟悉Spark的各类自适应查询执行(Adaptive Query Execution,即AQE)参数。当“write.distribution-mode”配置不为“none”时,在数据交换过程中,AQE会控制Spark Task的合并与拆分,使得生成大小符合“spark.sql.adaptive.advisoryPartitionSizeInBytes”配置的Task。这些配置同样会影响用户手动执行的重分区或排序操作。还需注意,该参数定义的是Spark内存中行数据的大小,而非磁盘上列存储压缩后的文件大小,因此需将其配置为比目标文件大小更大的值,内存数据大小与磁盘文件大小的比例取决于数据本身的特性。