Iceberg DDL语法说明
CREATE TABLE
Spark 3可以在任何Iceberg Catalog中创建表,Iceberg会将Spark中的列类型转换为相应的Iceberg类型。命令如下:
SET spark.sql.catalog.prod = org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.prod.type = hadoop;
SET spark.sql.catalog.prod.warehouse = file:///path/to/local/warehouse;
CREATE DATABASE IF NOT EXISTS prod.db;
CREATE TABLE prod.db.sample (id bigint NOT NULL COMMENT 'unique id', data string) USING iceberg;
其中,需设置“spark.sql.catalog.prod.warehouse”的值为本地目录路径。
创建表命令支持所有范围的Spark create子句,包括:
- PARTITIONED BY:配置分区。
- LOCATION '(fully-qualified-uri)':设置表位置。
- COMMENT 'table documentation':设置表描述。
- TBLPROPERTIES ('key'='value', ...):设置表配置。
PARTITIONED BY
创建分区表的命令为:
CREATE TABLE prod.db.sample (id bigint, data string, category string) USING iceberg PARTITIONED BY (category);
该子句还支持转换表达式来创建隐藏分区:
CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (bucket(16, id), days(ts), category);
转换表达式支持的转换范围包括:
- year(ts):按年份进行分区。
- month(ts):按月份进行分区。
- day(ts) or date(ts):等同于dateint分区,即按日期整数格式分区。
- hour(ts) or date_hour(ts):等同于dateint + 小时分区,即按日期整数格式与小时组合分区。
- bucket(N, col):按列(col)的哈希值对N取模后的结果进行分桶分区。
- truncate(L, col):按列(col)值截断至指定长度(L)后进行分区,具体规则如下:
- 字符串类型:截断为指定长度(L)的字符串。
- 整数(Integer)与长整数(Long)类型:按区间截断,例如truncate (10, i)会生成 0、10、20、30……分区区间。
CTAS语法
当使用SparkCatalog时,Iceberg支持将CTAS(CREATE TABLE ... AS SELECT)作为原子操作执行。Iceberg同样支持在SparkSessionCatalog中执行CTAS操作,但该场景下的CTAS不具备原子性。
CREATE TABLE prod.db.sample USING iceberg AS SELECT ...;
通过CTAS操作生成的新表,不会从SELECT语句所指定的源表中集成分区规范和表属性。如果需要为该新建表配置分区规范或表属性,可在CTAS语句中通过PARTITIONED BY子句明确声明分区规范,通过TBLPROPERTIES子句自定义表属性。命令为:
CREATE TABLE prod.db.sample USING iceberg PARTITIONED BY (part) TBLPROPERTIES ('key'='value') AS SELECT ...;
RTAS语法
当使用SparkCatalog时,Iceberg支持将RTAS(REPLACE TABLE ... AS SELECT)作为原子操作执行。在SparkSessionCatalog中同样支持RTAS操作,但该场景下的RTAS不具备原子性。
原子性表替换会基于SELECT查询的结果创建新快照,同时保留表的历史记录。使用方法为:
- 方式一:
REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...;
- 方式二:
REPLACE TABLE prod.db.sample USING iceberg PARTITIONED BY (part) TBLPROPERTIES ('key'='value') AS SELECT ...; - 方式三:
CREATE OR REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...;
DROP TABLE
Iceberg的DROP TABLE操作行为在0.14版本发生变更,具体差异如下:
- 0.14之前版本:执行DROP TABLE会从目录(Catalog)中移除表的元数据,并且删除表的实际数据内容。
- 0.14及之后版本:DROP TABLE仅会从目录中移除表的元数据;如果需要同时删除表的实际数据内容,需使用DROP TABLE PURGE命令。
- DROP TABLE(仅移除元数据)
DROP TABLE prod.db.sample;
- DROP TABLE PURGE(移除元数据并删除数据)
DROP TABLE prod.db.sample PURGE;
- DROP TABLE(仅移除元数据)
ALTER TABLE
在Spark 3中,Iceberg完全支持ALTER TABLE操作,具体包括:
- 重命名表(ALTER TABLE ... RENAME TO):
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
需注意,Iceberg的Hadoop类型Catalog不支持表重命名操作。
- 设置与移除表属性(ALTER TABLE ... SET/UNSET TBLPROPERTIES):
- 设置表属性:
ALTER TABLE prod.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456');SET TBLPROPERTIES命令也可用于设置表的注释(描述信息):
ALTER TABLE prod.db.sample SET TBLPROPERTIES ( 'comment' = 'A table comment.');
- 移除表属性,可使用UNSET命令:
ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
- 设置表属性:
- 新增、删除和重命名列
- 新增列
- 可使用ALTER TABLE语句的ADD COLUMNS子句向Iceberg表添加列,且不允许通过添加列的方式修改映射(map)的 “键(key)” 列,仅能更新映射的值(value)。示例如下:
- 向“prod.db.sample”表中添加struct类型的“point”列:
ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double, y: double>;
- 向“prod.db.sample”表中添加double类型的“point.z”列:
ALTER TABLE prod.db.sample ADD COLUMN point.z double;
- 向“prod.db.sample”表中添加array类型的“points”列:
ALTER TABLE prod.db.sample ADD COLUMN points array<struct<x: double, y: double>>;
- 向“prod.db.sample”表中添加struct类型的“point”列:
- 也可通过添加FIRST或AFTER子句,在表中的任意位置添加列,示例如下:
- 示例一:
ALTER TABLE prod.db.sample ADD COLUMN new_column bigint AFTER {表中已有字段名(例如id,data等)}; - 示例二:
ALTER TABLE prod.db.sample ADD COLUMN new_column bigint FIRST;
- 示例一:
- 可使用ALTER TABLE语句的ADD COLUMNS子句向Iceberg表添加列,且不允许通过添加列的方式修改映射(map)的 “键(key)” 列,仅能更新映射的值(value)。示例如下:
- 重命名列(ALTER TABLE ... RENAME COLUMN)
Iceberg允许对任意字段进行重命名。如果需要重命名字段,可使用RENAME COLUMN子句:
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
- 可使用ALTER TABLE ... DROP COLUMN语句删除列:
ALTER TABLE prod.db.sample DROP COLUMN id;
- 新增列
- 新增、删除和重命名嵌套字段
- 调整顶层列与嵌套结构体字段的顺序
Iceberg允许使用FIRST和AFTER子句调整顶层列或结构体中的列顺序:
ALTER TABLE prod.db.sample ALTER COLUMN id FIRST;
- 拓宽int、float、decimal字段的类型(ALTER TABLE ... ALTER COLUMN)
ALTER COLUMN用于拓宽字段类型,将字段设为可选项、设置注释以及调整字段顺序。
Iceberg允许在类型更新安全的前提下修改列类型,安全的列类型更新包括:
- int到bigint
- float到double
- decimal (P,S)到decimal (P2,S),其中P2 > P,即精度可提高,小数位数不可变更。
例如,执行以下命令将“measurement”列的类型变更为“double”:
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
也可使用ALTER COLUMN更新列注释:ALTER TABLE prod.db.sample ALTER COLUMN {double型字段} COMMENT 'unit is kilobytes per second'; - 非空列(required column)改为可空列(optional column)
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;
- 通过启用SQL扩展,还可支持分区演进与表写入顺序设置功能。
Iceberg SQL扩展DDL语法
在Spark 3中使用Iceberg SQL扩展时,可使用以下命令:
ALTER TABLE ... ADD PARTITION FIELD;
- ADD PARTITION FIELD
Iceberg支持通过ADD PARTITION FIELD向分区规范中添加新的分区字段:
ALTER TABLE prod.db.sample ADD PARTITION FIELD id;
同时支持分区转换操作:
- 示例一:
ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
- 示例二:
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
- 示例三:
ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
- 示例四:
ALTER TABLE prod.db.sample ADD PARTITION FIELD day(ts);
添加分区字段属于元数据操作,不会更改任何现有表数据。新数据将按照新的分区方式写入,但现有数据仍保持原有的分区布局。在元数据表中,旧数据文件的新分区字段值将为null。
当表的分区方式发生变化时,动态分区覆盖行为也会随之改变,因为动态覆盖会隐式替换分区。如果需要显式执行覆盖操作,请使用新的DataFrameWriterV2 API。
- 示例一:
- DROP PARTITION FIELD
可使用DROP PARTITION FIELD移除分区字段。即使分区被移除,对应的列仍会保留在表的Schema 中。
删除分区字段属于元数据操作,不会更改任何现有表数据。新数据将按照新的分区方式写入,但现有数据仍保持原有的分区布局。
- 创建Iceberg表,例如:
CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp, shard int) USING iceberg PARTITIONED BY (category);
- 移除分区字段:
- 示例一:
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog;
- 示例二:
ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id);
- 示例三:
ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data);
- 示例四:
ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts);
- 示例五:
ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;
- 示例一:
- 创建Iceberg表,例如:
- REPLACE PARTITION FIELD
通过REPLACE PARTITION FIELD可在单次元数据更新中,用新的分区字段替换原有分区字段:
- 示例一:
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
- 示例二:
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
- 示例一:
- WRITE ORDERED BY
Iceberg表可配置排序规则,部分引擎会依据该规则,在向表写入数据时自动对数据进行排序。例如,Spark中的MERGE INTO操作会使用表的排序规则。
可使用WRITE ORDERED BY子句为表设置写入排序规则,示例如下:
- 示例一:
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id;
- 示例二:
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC;
- 示例三:
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;
WRITE ORDERED BY用于设置全局排序规则,会对跨任务的行进行排序,类似于在INSERT命令中使用ORDER BY,例如:INSERT INTO prod.db.sample SELECT id, data, category, ts FROM {其他表} ORDER BY ts, category;若只需在每个任务内排序(而非跨任务排序),可使用LOCALLY ORDERED BY:
ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id;
可使用UNORDERED取消表的排序顺序:
ALTER TABLE prod.db.sample WRITE UNORDERED;
- 示例一:
- WRITE DISTRIBUTED BY PARTITION
WRITE DISTRIBUTED BY PARTITION用于指定每个分区由单个写入器(writer)处理,其默认实现方式为哈希分布。
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION;
DISTRIBUTED BY PARTITION可与LOCALLY ORDERED BY结合使用,实现按分区分布数据,同时在每个任务内对行进行本地排序:
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id;
- SET IDENTIFIER FIELDS
Iceberg支持通过SET IDENTIFIER FIELDS为表规范设置标识字段。如果Spark表配置了标识字段,该表可支持Flink SQL的更新插入(upsert)操作。
- 示例一,需确保字段无空值:
ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id;
- 示例二:
ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id, data;
标识字段在创建或添加时,必须是非空列。后续执行的ALTER TABLE ... SET IDENTIFIER FIELDS语句会覆盖之前的标识字段设置。
- 示例一,需确保字段无空值:
- DROP IDENTIFIER FIELDS
可使用DROP IDENTIFIER FIELDS移除标识字段,且即使标识字段被移除,对应的列仍会保留在表的Schema中。
- 示例一,需确保字段无空值:
ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id;
- 示例二:
ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id, data;
- 示例一,需确保字段无空值:
Branch和Tag DDL语法
- CREATE BRANCH
可通过CREATE BRANCH语句创建分支,该语句支持以下选项:
- IF NOT EXISTS:如果分支已存在,操作不会失败。
- CREATE OR REPLACE:如果分支已存在,则更新该分支。
- 在特定快照处创建分支。
- 创建具有指定保留期的分支:
- 示例一:
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`;
- 示例二:
ALTER TABLE prod.db.sample CREATE BRANCH IF NOT EXISTS `audit-branch`;
- 示例三:
ALTER TABLE prod.db.sample CREATE OR REPLACE BRANCH `audit-branch`;
- 示例一:
- CREATE TAG
可通过CREATE TAG语句创建标签,该语句支持以下选项:
- IF NOT EXISTS:如果标签已存在,操作不会失败。
- CREATE OR REPLACE:如果标签已存在,则更新该标签。
- 在特定快照(snapshot)处创建标签。
- 创建具有指定保留期的标签:
- 示例一:
ALTER TABLE prod.db.sample CREATE TAG `historical-tag`;
- 示例二:
ALTER TABLE prod.db.sample CREATE TAG IF NOT EXISTS `historical-tag`;
- 示例三:
ALTER TABLE prod.db.sample CREATE OR REPLACE TAG `historical-tag`;
- 示例一:
- REPLACE BRANCH
可通过REPLACE BRANCH语句更新分支所引用的快照,同时也可更新其保留期:
ALTER TABLE prod.db.sample REPLACE BRANCH `audit-branch` AS OF VERSION {current-snapshot-id} RETAIN 60 DAYS; - REPLACE TAG
可通过REPLACE TAG语句更新标签所引用的快照,同时也可更新其保留期:
ALTER TABLE prod.db.sample REPLACE TAG `historical-tag` AS OF {current-snapshot-id} RETAIN 60 DAYS; - DROP BRANCH
ALTER TABLE prod.db.sample DROP BRANCH `audit-branch`;
- DROP TAG
ALTER TABLE prod.db.sample DROP TAG `historical-tag`;
Iceberg视图DDL语法
Iceberg视图的设计目标是提供引擎无关的视图定义,确保同一视图可以在不同的查询引擎中保持一致的行为和结果,增强了数据查询的灵活性和兼容性。
Iceberg视图是SQL视图的一种通用表示形式,旨在跨多个查询引擎进行解析。以下内容介绍如何在Spark 3.4及之后版本中创建和管理视图。
- 创建视图
CREATE VIEW <viewName> AS SELECT * FROM <tableName>;
使用IF NOT EXISTS可避免因视图已存在而导致语句执行失败:
CREATE VIEW IF NOT EXISTS <viewName> AS SELECT * FROM <tableName>;
创建包含注释的视图,其中可包含与源表不同的列别名和列注释:
CREATE VIEW <viewName> (ID COMMENT 'Unique ID', ZIP COMMENT 'Zipcode') COMMENT 'View Comment' AS SELECT id, zip FROM <tableName>;
- 创建包含属性的视图:
CREATE VIEW <viewName> TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2') AS SELECT * FROM <tableName>;查看视图的属性:
SHOW TBLPROPERTIES <viewName>;
- 删除视图
- 删除已存在的视图:
DROP VIEW <viewName>;
- 如果视图不存在时不希望语句执行失败,可使用IF EXISTS关键字:
DROP VIEW IF EXISTS <viewName>;
- 删除已存在的视图:
- 更新视图
如果需要更新视图的Schema、属性或底层SQL语句,可使用CREATE OR REPLACE语句覆盖原有视图:
CREATE OR REPLACE <viewName> (updated_id COMMENT 'updated ID') TBLPROPERTIES ('key1' = 'new_val1') AS SELECT id FROM <tableName>; - 设置或移除视图属性
- 使用ALTER VIEW ... SET TBLPROPERTIES为已存在的视图设置属性:
ALTER VIEW <viewName> SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2'); - 使用ALTER VIEW ... UNSET TBLPROPERTIES从已存在的视图中移除属性:
ALTER VIEW <viewName> UNSET TBLPROPERTIES ('key1', 'key2');
- 使用ALTER VIEW ... SET TBLPROPERTIES为已存在的视图设置属性:
- 查看视图
查看当前已设置的命名空间(通过USE <namespace>设置)中的所有视图:
SHOW VIEWS;
查看指定目录或命名空间中的所有可用视图:
- 查看指定目录(catalog)中的所有视图:
SHOW VIEWS IN <catalog>;
- 查看指定命名空间(namespace)中的所有视图:
SHOW VIEWS IN <namespace>;
- 查看指定目录和命名空间组合中的所有视图:
SHOW VIEWS IN <catalog>.<namespace>;
- 查看指定目录(catalog)中的所有视图:
- 查看创建视图的语句:
SHOW CREATE TABLE <viewName>;
- 使用DESCRIBE查看视图详情:
DESCRIBE [EXTENDED] <viewName>;