写入数据
基本语法
Spark支持INSERT INTO、MERGE INTO、INSERT OVERWRITE的SQL写入语句,以及新的DataFrameWriterV2 API。
- INSERT INTO
创建表:
CREATE TABLE prod.db.table (id bigint, data string) USING iceberg;
写入数据:
- 示例一:
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b');
- 示例二:
INSERT INTO prod.db.table SELECT ...;
- 示例一:
- MERGE INTO
Spark新增了MERGE INTO查询,可用于行级数据更新操作。
Iceberg通过重写包含需要更新的行的数据文件(以覆盖提交方式)来支持MERGE INTO。推荐使用MERGE INTO而非INSERT OVERWRITE,因为Iceberg仅需替换受影响的数据文件,且动态覆盖所覆盖的数据可能会因表分区变更而改变。
MERGE INTO通过另一个查询提供的更新集更新目标表,目标表中每行的更新通过ON子句匹配:
MERGE INTO prod.db.target t USING (SELECT ...) s ON t.id = s.id WHEN ...;
目标表中匹配的行可通过WHEN MATCHED ... THEN ... 定义更新操作,可添加多个带条件的MATCHED子句,仅第一个匹配的表达式会被执行,例如:
- 示例一:
WHEN MATCHED AND s.op = 'delete' THEN DELETE
- 示例二:
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
- 示例三:
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
源数据中未匹配到目标表的行可通过WHEN NOT MATCHED插入到目标表:WHEN NOT MATCHED THEN INSERT *
插入操作也支持附加条件:
WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)
源数据中最多只能有一条记录更新目标表对应的一行,否则会报错。
- 示例一:
- INSERT OVERWRITE
INSERT OVERWRITE可使用查询结果替换表中的数据,对于Iceberg表,覆盖操作是原子性的,会基于Spark的分区覆盖模式和表的分区方式替换对应分区。
- 覆盖行为
Spark默认使用静态覆盖模式,但写入Iceberg表时推荐使用动态覆盖模式。静态覆盖模式通过将PARTITION子句转换为过滤器来确定要覆盖的分区,但 PARTITION子句只能引用表的列。
动态覆盖模式可通过设置spark.sql.sources.partitionOverwriteMode=dynamic启用。
以下示例为动态和静态覆盖不同行为:
创建logs表,定义如下:
CREATE TABLE prod.db.logs (uuid string NOT NULL, level string NOT NULL, ts timestamp NOT NULL, message string) USING iceberg PARTITIONED BY (level, hours(ts));
- 动态覆盖:
当Spark的覆盖模式为动态时,查询结果中包含行的分区将被替换。
例如,以下查询为从logs表中移除重复的日志事件:
INSERT OVERWRITE prod.db.logs SELECT uuid, first(level), first(ts), first(message) FROM prod.db.logs WHERE cast(ts as date) = '2020-07-01' GROUP BY uuid;
在动态模式下,此操作将仅替换查询结果中包含行的分区。由于所有行的日期都限制在7月1日,因此仅当天的小时分区会被替换。
- 静态覆盖:
当Spark的覆盖模式为静态时,PARTITION子句会被转换为过滤器,用于从表中删除数据。若省略PARTITION子句,所有分区都将被替换。
在以上示例中,对于不含PARTITION子句的查询,在静态模式下会删除表中所有现有行,仅写入7月1日的日志。
如果需要仅覆盖加载的分区,需要添加与SELECT查询过滤器匹配的PARTITION子句:
INSERT OVERWRITE prod.db.logs PARTITION (level = 'INFO') SELECT uuid, first(level), first(ts), first(message) FROM prod.db.logs WHERE level = 'INFO' GROUP BY uuid;
需注意,静态模式无法像动态模式替换小时级分区,因为PARTITION子句只能引用表的显式列,不能引用隐式分区。
- 动态覆盖:
- 覆盖行为
- DELETE FROM
Spark新增支持DELETE FROM查询,用于从表中删除数据。
删除查询通过过滤器(WHERE子句)匹配需要删除的行,仅删除满足条件的记录,且操作对Iceberg表是原子性的。
示例一:
DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';
示例二:
DELETE FROM prod.db.all_events WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);
示例三:
DELETE FROM prod.db.orders AS t1 WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);
如果删除过滤器匹配表的整个分区,Iceberg无需重写数据文件,仅通过更新元数据即可完成操作,性能极高。
如果删除过滤器仅匹配表中的部分行,Iceberg会仅重写包含这些行的数据文件,即读取旧文件、过滤掉需删除的行、写入新文件,最后更新元数据指向新文件,最大限度减少数据处理量。
- UPDATE
UPDATE用于更新查询通过过滤器匹配需要更新的行,并对这些行执行字段更新操作。
示例一:
UPDATE prod.db.table SET c1 = 'update_c1', c2 = 'update_c2' WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00';
示例二:
UPDATE prod.db.all_events SET session_time = 0, ignored = true WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events);
示例三:
UPDATE prod.db.orders AS t1 SET order_status = 'returned' WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid);
对于基于输入数据的更复杂行级更新,可参考MERGE INTO,能更灵活地处理多条件组合的更新需求。