更新时间:2025-12-10 GMT+08:00
分享

Iceberg DDL语法说明

CREATE TABLE

Spark 3可以在任何Iceberg Catalog中创建表,Iceberg会将Spark中的列类型转换为相应的Iceberg类型。命令如下:

SET spark.sql.catalog.prod = org.apache.iceberg.spark.SparkCatalog;

SET spark.sql.catalog.prod.type = hadoop;

SET spark.sql.catalog.prod.warehouse = file:///path/to/local/warehouse;

CREATE DATABASE IF NOT EXISTS prod.db;

CREATE TABLE prod.db.sample (id bigint NOT NULL COMMENT 'unique id', data string) USING iceberg;

其中,需设置“spark.sql.catalog.prod.warehouse”的值为本地目录路径。

创建表命令支持所有范围的Spark create子句,包括:

  • PARTITIONED BY:配置分区。
  • LOCATION '(fully-qualified-uri)':设置表位置。
  • COMMENT 'table documentation':设置表描述。
  • TBLPROPERTIES ('key'='value', ...):设置表配置。

PARTITIONED BY

创建分区表的命令为:

CREATE TABLE prod.db.sample (id bigint, data string, category string) USING iceberg PARTITIONED BY (category);

该子句还支持转换表达式来创建隐藏分区:

CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (bucket(16, id), days(ts), category);

转换表达式支持的转换范围包括:

  • year(ts):按年份进行分区。
  • month(ts):按月份进行分区。
  • day(ts) or date(ts):等同于dateint分区,即按日期整数格式分区。
  • hour(ts) or date_hour(ts):等同于dateint + 小时分区,即按日期整数格式与小时组合分区。
  • bucket(N, col):按列(col)的哈希值对N取模后的结果进行分桶分区。
  • truncate(L, col):按列(col)值截断至指定长度(L)后进行分区,具体规则如下:
    • 字符串类型:截断为指定长度(L)的字符串。
    • 整数(Integer)与长整数(Long)类型:按区间截断,例如truncate (10, i)会生成 0、10、20、30……分区区间。

CTAS语法

当使用SparkCatalog时,Iceberg支持将CTAS(CREATE TABLE ... AS SELECT)作为原子操作执行。Iceberg同样支持在SparkSessionCatalog中执行CTAS操作,但该场景下的CTAS不具备原子性。

CREATE TABLE prod.db.sample USING iceberg AS SELECT ...;

通过CTAS操作生成的新表,不会从SELECT语句所指定的源表中集成分区规范和表属性。如果需要为该新建表配置分区规范或表属性,可在CTAS语句中通过PARTITIONED BY子句明确声明分区规范,通过TBLPROPERTIES子句自定义表属性。命令为:

CREATE TABLE prod.db.sample USING iceberg PARTITIONED BY (part) TBLPROPERTIES ('key'='value') AS SELECT ...;

RTAS语法

当使用SparkCatalog时,Iceberg支持将RTAS(REPLACE TABLE ... AS SELECT)作为原子操作执行。在SparkSessionCatalog中同样支持RTAS操作,但该场景下的RTAS不具备原子性。

原子性表替换会基于SELECT查询的结果创建新快照,同时保留表的历史记录。使用方法为:

  • 方式一:
    REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...;
  • 方式二:
    REPLACE TABLE prod.db.sample USING iceberg PARTITIONED BY (part) TBLPROPERTIES ('key'='value') AS SELECT ...;
  • 方式三:
    CREATE OR REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...;

DROP TABLE

Iceberg的DROP TABLE操作行为在0.14版本发生变更,具体差异如下:

  • 0.14之前版本:执行DROP TABLE会从目录(Catalog)中移除表的元数据,并且删除表的实际数据内容。
  • 0.14及之后版本:DROP TABLE仅会从目录中移除表的元数据;如果需要同时删除表的实际数据内容,需使用DROP TABLE PURGE命令。
    • DROP TABLE(仅移除元数据)
      DROP TABLE prod.db.sample;
    • DROP TABLE PURGE(移除元数据并删除数据)
      DROP TABLE prod.db.sample PURGE; 

ALTER TABLE

在Spark 3中,Iceberg完全支持ALTER TABLE操作,具体包括:

  • 重命名表(ALTER TABLE ... RENAME TO):
    ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;

    需注意,Iceberg的Hadoop类型Catalog不支持表重命名操作。

  • 设置与移除表属性(ALTER TABLE ... SET/UNSET TBLPROPERTIES):
    • 设置表属性:
      ALTER TABLE prod.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456');

      SET TBLPROPERTIES命令也可用于设置表的注释(描述信息):

      ALTER TABLE prod.db.sample SET TBLPROPERTIES ( 'comment' = 'A table comment.');
    • 移除表属性,可使用UNSET命令:
      ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
  • 新增、删除和重命名列
    • 新增列
      • 可使用ALTER TABLE语句的ADD COLUMNS子句向Iceberg表添加列,且不允许通过添加列的方式修改映射(map)的 “键(key)” 列,仅能更新映射的值(value)。示例如下:
        • 向“prod.db.sample”表中添加struct类型的“point”列:
          ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double, y: double>;
        • 向“prod.db.sample”表中添加double类型的“point.z”列:
          ALTER TABLE prod.db.sample ADD COLUMN point.z double;
        • 向“prod.db.sample”表中添加array类型的“points”列:
          ALTER TABLE prod.db.sample ADD COLUMN points array<struct<x: double, y: double>>;
        • 向“prod.db.sample”表中添加double类型的“points.element.z”列:
          ALTER TABLE prod.db.sample ADD COLUMN points.element.z double;
        • 向“prod.db.sample”表中添加map类型的“points”列:
          ALTER TABLE prod.db.sample ADD COLUMN points map<struct<x: int>, struct<a: int>>;
      • 也可通过添加FIRST或AFTER子句,在表中的任意位置添加列,示例如下:
        • 示例一:
          ALTER TABLE prod.db.sample ADD COLUMN new_column bigint AFTER {表中已有字段名(例如id,data等)};
        • 示例二:
          ALTER TABLE prod.db.sample ADD COLUMN new_column bigint FIRST;
    • 重命名列(ALTER TABLE ... RENAME COLUMN)

      Iceberg允许对任意字段进行重命名。如果需要重命名字段,可使用RENAME COLUMN子句:

      ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
    • 可使用ALTER TABLE ... DROP COLUMN语句删除列:
      ALTER TABLE prod.db.sample DROP COLUMN id;
  • 新增、删除和重命名嵌套字段

    可使用ADD COLUMNDROP COLUMN为结构体(struct)添加或删除列。

  • 调整顶层列与嵌套结构体字段的顺序

    Iceberg允许使用FIRST和AFTER子句调整顶层列或结构体中的列顺序:

    ALTER TABLE prod.db.sample ALTER COLUMN id FIRST;
  • 拓宽int、float、decimal字段的类型(ALTER TABLE ... ALTER COLUMN)

    ALTER COLUMN用于拓宽字段类型,将字段设为可选项、设置注释以及调整字段顺序。

    Iceberg允许在类型更新安全的前提下修改列类型,安全的列类型更新包括:

    • int到bigint
    • float到double
    • decimal (P,S)到decimal (P2,S),其中P2 > P,即精度可提高,小数位数不可变更。

    例如,执行以下命令将“measurement”列的类型变更为“double”:

    ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
    也可使用ALTER COLUMN更新列注释:
    ALTER TABLE prod.db.sample ALTER COLUMN {double型字段} COMMENT 'unit is kilobytes per second';
  • 非空列(required column)改为可空列(optional column)

    可通过DROP NOT NULL更改非空列的可空性:

    ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;
  • 通过启用SQL扩展,还可支持分区演进与表写入顺序设置功能。

Iceberg SQL扩展DDL语法

在Spark 3中使用Iceberg SQL扩展时,可使用以下命令:

ALTER TABLE ... ADD PARTITION FIELD;
  • ADD PARTITION FIELD

    Iceberg支持通过ADD PARTITION FIELD向分区规范中添加新的分区字段:

    ALTER TABLE prod.db.sample ADD PARTITION FIELD id;

    同时支持分区转换操作:

    • 示例一:
      ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
    • 示例二:
      ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
    • 示例三:
      ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
    • 示例四:
      ALTER TABLE prod.db.sample ADD PARTITION FIELD day(ts);

    添加分区字段属于元数据操作,不会更改任何现有表数据。新数据将按照新的分区方式写入,但现有数据仍保持原有的分区布局。在元数据表中,旧数据文件的新分区字段值将为null。

    当表的分区方式发生变化时,动态分区覆盖行为也会随之改变,因为动态覆盖会隐式替换分区。如果需要显式执行覆盖操作,请使用新的DataFrameWriterV2 API。

  • DROP PARTITION FIELD

    可使用DROP PARTITION FIELD移除分区字段。即使分区被移除,对应的列仍会保留在表的Schema 中。

    删除分区字段属于元数据操作,不会更改任何现有表数据。新数据将按照新的分区方式写入,但现有数据仍保持原有的分区布局。

    • 创建Iceberg表,例如:
      CREATE TABLE prod.db.sample (id bigint, data string, category string, ts timestamp, shard int) USING iceberg PARTITIONED BY (category);
    • 移除分区字段:
      • 示例一:
        ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog; 
      • 示例二:
        ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id); 
      • 示例三:
        ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data); 
      • 示例四:
        ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts); 
      • 示例五:
        ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;
  • REPLACE PARTITION FIELD

    通过REPLACE PARTITION FIELD可在单次元数据更新中,用新的分区字段替换原有分区字段:

    • 示例一:
      ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
    • 示例二:
      ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
  • WRITE ORDERED BY

    Iceberg表可配置排序规则,部分引擎会依据该规则,在向表写入数据时自动对数据进行排序。例如,Spark中的MERGE INTO操作会使用表的排序规则。

    可使用WRITE ORDERED BY子句为表设置写入排序规则,示例如下:

    • 示例一:
      ALTER TABLE prod.db.sample WRITE ORDERED BY category, id;
    • 示例二:
      ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC;
    • 示例三:
      ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;
    WRITE ORDERED BY用于设置全局排序规则,会对跨任务的行进行排序,类似于在INSERT命令中使用ORDER BY,例如:
    INSERT INTO prod.db.sample SELECT id, data, category, ts FROM {其他表} ORDER BY ts, category;

    若只需在每个任务内排序(而非跨任务排序),可使用LOCALLY ORDERED BY

    ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id;

    可使用UNORDERED取消表的排序顺序:

    ALTER TABLE prod.db.sample WRITE UNORDERED;
  • WRITE DISTRIBUTED BY PARTITION

    WRITE DISTRIBUTED BY PARTITION用于指定每个分区由单个写入器(writer)处理,其默认实现方式为哈希分布。

    ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION;

    DISTRIBUTED BY PARTITION可与LOCALLY ORDERED BY结合使用,实现按分区分布数据,同时在每个任务内对行进行本地排序:

    ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id;
  • SET IDENTIFIER FIELDS

    Iceberg支持通过SET IDENTIFIER FIELDS为表规范设置标识字段。如果Spark表配置了标识字段,该表可支持Flink SQL的更新插入(upsert)操作。

    • 示例一,需确保字段无空值:
      ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id;
    • 示例二:
      ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id, data;

    标识字段在创建或添加时,必须是非空列。后续执行的ALTER TABLE ... SET IDENTIFIER FIELDS语句会覆盖之前的标识字段设置。

  • DROP IDENTIFIER FIELDS

    可使用DROP IDENTIFIER FIELDS移除标识字段,且即使标识字段被移除,对应的列仍会保留在表的Schema中。

    • 示例一,需确保字段无空值:
      ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id;
    • 示例二:
      ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id, data;

Branch和Tag DDL语法

  • CREATE BRANCH

    可通过CREATE BRANCH语句创建分支,该语句支持以下选项:

    • IF NOT EXISTS:如果分支已存在,操作不会失败。
    • CREATE OR REPLACE:如果分支已存在,则更新该分支。
    • 在特定快照处创建分支。
    • 创建具有指定保留期的分支:
      • 示例一:
        ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`;
      • 示例二:
        ALTER TABLE prod.db.sample CREATE BRANCH IF NOT EXISTS `audit-branch`;
      • 示例三:
        ALTER TABLE prod.db.sample CREATE OR REPLACE BRANCH `audit-branch`;
  • CREATE TAG

    可通过CREATE TAG语句创建标签,该语句支持以下选项:

    • IF NOT EXISTS:如果标签已存在,操作不会失败。
    • CREATE OR REPLACE:如果标签已存在,则更新该标签。
    • 在特定快照(snapshot)处创建标签。
    • 创建具有指定保留期的标签:
      • 示例一:
        ALTER TABLE prod.db.sample CREATE TAG `historical-tag`;
      • 示例二:
        ALTER TABLE prod.db.sample CREATE TAG IF NOT EXISTS `historical-tag`;
      • 示例三:
        ALTER TABLE prod.db.sample CREATE OR REPLACE TAG `historical-tag`;
  • REPLACE BRANCH

    可通过REPLACE BRANCH语句更新分支所引用的快照,同时也可更新其保留期:

    ALTER TABLE prod.db.sample REPLACE BRANCH `audit-branch` AS OF VERSION {current-snapshot-id} RETAIN 60 DAYS;
  • REPLACE TAG

    可通过REPLACE TAG语句更新标签所引用的快照,同时也可更新其保留期:

    ALTER TABLE prod.db.sample REPLACE TAG `historical-tag` AS OF {current-snapshot-id} RETAIN 60 DAYS;
  • DROP BRANCH

    可通过DROP BRANCH语句删除分支:

    ALTER TABLE prod.db.sample DROP BRANCH `audit-branch`;
  • DROP TAG

    可通过DROP TAG语句删除标签:

    ALTER TABLE prod.db.sample DROP TAG `historical-tag`;

Iceberg视图DDL语法

Iceberg视图的设计目标是提供引擎无关的视图定义,确保同一视图可以在不同的查询引擎中保持一致的行为和结果,增强了数据查询的灵活性和兼容性。

Iceberg视图是SQL视图的一种通用表示形式,旨在跨多个查询引擎进行解析。以下内容介绍如何在Spark 3.4及之后版本中创建和管理视图。

  • 创建视图

    创建一个不包含任何注释或属性的简单视图:

    CREATE VIEW <viewName> AS SELECT * FROM <tableName>;

    使用IF NOT EXISTS可避免因视图已存在而导致语句执行失败:

    CREATE VIEW IF NOT EXISTS <viewName> AS SELECT * FROM <tableName>;

    创建包含注释的视图,其中可包含与源表不同的列别名和列注释:

    CREATE VIEW <viewName> (ID COMMENT 'Unique ID', ZIP COMMENT 'Zipcode')  COMMENT 'View Comment' AS SELECT id, zip FROM <tableName>;
  • 创建包含属性的视图:

    使用TBLPROPERTIES创建包含属性的视图:

    CREATE VIEW <viewName> TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2') AS SELECT * FROM <tableName>;

    查看视图的属性:

    SHOW TBLPROPERTIES <viewName>;
  • 删除视图
    • 删除已存在的视图:
      DROP VIEW <viewName>;
    • 如果视图不存在时不希望语句执行失败,可使用IF EXISTS关键字:
      DROP VIEW IF EXISTS <viewName>;
  • 更新视图

    如果需要更新视图的Schema、属性或底层SQL语句,可使用CREATE OR REPLACE语句覆盖原有视图:

    CREATE OR REPLACE <viewName> (updated_id COMMENT 'updated ID') TBLPROPERTIES ('key1' = 'new_val1') AS SELECT id FROM <tableName>;
  • 设置或移除视图属性
    • 使用ALTER VIEW ... SET TBLPROPERTIES为已存在的视图设置属性:
      ALTER VIEW <viewName> SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2');
    • 使用ALTER VIEW ... UNSET TBLPROPERTIES从已存在的视图中移除属性:
      ALTER VIEW <viewName> UNSET TBLPROPERTIES ('key1', 'key2');
  • 查看视图

    查看当前已设置的命名空间(通过USE <namespace>设置)中的所有视图:

    SHOW VIEWS;

    查看指定目录或命名空间中的所有可用视图:

    • 查看指定目录(catalog)中的所有视图:
      SHOW VIEWS IN <catalog>;
    • 查看指定命名空间(namespace)中的所有视图:
      SHOW VIEWS IN <namespace>;
    • 查看指定目录和命名空间组合中的所有视图:
      SHOW VIEWS IN <catalog>.<namespace>;
  • 查看创建视图的语句:
    SHOW CREATE TABLE <viewName>;
  • 使用DESCRIBE查看视图详情:
    DESCRIBE [EXTENDED] <viewName>;

相关文档