更新时间:2025-12-10 GMT+08:00
分享

使用Spark查询Iceberg表数据

在Spark中使用Iceberg前,需先配置Spark目录,spark_catalog和prod目录配置请参见配置Iceberg Catalog。Iceberg基于Apache Spark的DataSourceV2 API实现数据源和目录功能。

查询表数据

在Spark 3中,表的标识符需包含目录名称。查询表数据操作为:

创建表,例如:

CREATE TABLE prod.db.table (id bigint, data string, category string, ts timestamp, shard int) USING iceberg PARTITIONED BY (category);
  • 查询表数据:
    SELECT * FROM prod.db.table; 
  • 对于历史记录、snapshots快照等元数据表,可将Iceberg表名作为命名空间来访问,例如:
    SELECT * FROM prod.db.table.files;

将表加载为DataFrame

如果需要将表加载为DataFrame,可使用spark.table方法:

//spark-shell
val df = spark.table("prod.db.table")

结合DataFrameReader使用目录

可通过Spark的DataFrameReader接口加载路径和表名,表的加载方式取决于标识符的指定形式。当使用spark.read.format("iceberg").load(table)spark.table(table)时,table支持以下多种格式,且优先级按以下顺序排列(例如,匹配到目录时,优先级高于任何命名空间解析):

  • file:///path/to/table:从指定路径加载HadoopTable(基于Hadoop存储的Iceberg表)。
  • tablename:从当前目录的当前命名空间中加载表,即currentCatalog.currentNamespace.tablename。
  • catalog.tablename:从指定目录中加载表。
  • namespace.tablename:从当前目录中加载指定命名空间下的表。
  • catalog.namespace.tablename:从指定目录中加载指定命名空间下的表。
  • namespace1.namespace2.tablename:从当前目录中加载多级命名空间下的表。

上述列表按优先级从高到低排序,例如,如果标识符可匹配到目录,则优先按目录规则加载表,而非进行命名空间解析。

时间旅行

  • SQL命令

    Spark 3.3及之后版本支持在SQL查询中使用TIMESTAMP AS OFVERSION AS OF子句实现时间旅行。其中,VERSION AS OF子句可接受长整数类型的快照 ID,或字符串类型的分支/标签名称。

    • 时间旅行至1986年10月26日01:21:00:
      SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00'; 
    • 时间旅行至快照ID 10963874102873:
      SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

      查询current-snapshot-id快照ID的命令为:

      DESCRIBE EXTENDED prod.db.table;
    • 时间旅行至“audit-branch”的头部快照:
      SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
    • 时间旅行至标签“historical-snapshot”所引用的快照:
      SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';

    还支持FOR SYSTEM_TIME AS OFFOR SYSTEM_VERSION AS OF语法,功能与TIMESTAMP AS OFVERSION AS OF一致,例如:

    • 示例一:
      SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'; 
    • 示例二:
      SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 10963874102873; 
    • 示例三:
      SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'audit-branch'; 
    • 示例四:
      SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'historical-snapshot';

    时间戳也可使用Unix时间戳(秒级) 传入,例如:

    SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;

    或可使用:

    SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;

    分支或标签也可通过类似元数据表的语法指定,格式为“branch_<分支名>”或“tag_<标签名>”:

    SELECT * FROM prod.db.table.`branch_audit-branch`;

    或可使用:

    SELECT * FROM prod.db.table.`tag_historical-snapshot`;

    -等特殊字符的标识符不合法,必须使用反引号(`)转义。 通过“branch_<分支名>”或“tag_<标签名>”定位的分支或标签,不可与VERSION AS OF子句同时使用 。

  • 时间旅行查询中的Schema选择

    各类时间旅行查询可根据定位目标的不同,使用快照自身的Schema或当前表的Schema,具体规则如下:

    • 使用快照自身的Schema:
      • 示例一:
        SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
      • 示例二:
        SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
    • 使用当前表的Schema:
      • 示例一:
        SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
      • 示例二:
        SELECT * FROM prod.db.table.`branch_audit-branch`;
    • 使用快照自身的Schema:
      • 示例一:
        SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot'; 
      • 示例二:
        SELECT * FROM prod.db.table.`tag_historical-snapshot`;
  • DataFrame

    在DataFrame API中,如果需要选择表的特定快照或某个时间点的快照,Iceberg支持以下四种Spark读取选项,用于精准定位目标快照:

    • snapshot-id:选择特定快照,需传入快照的ID。
    • as-of-timestamp:选择指定时间点的当前快照,需传入毫秒级Unix时间戳。
    • branch:选择指定分支的头部快照。
    • tag:选择指定标签关联的快照。

    例如:

    • 时间旅行至1986年10月26日01:21:00:
      spark.read.option("as-of-timestamp", "499162860000").format("iceberg").load("path/to/table")
    • 时间旅行至快照ID 10963874102873:
      spark.read.option("snapshot-id", 10963874102873L).format("iceberg").load("path/to/table")
    • 时间旅行至标签“historical-snapshot”所引用的快照:
      spark.read.option(SparkReadOptions.TAG, "historical-snapshot").format("iceberg").load("path/to/table")
    • 时间旅行至“audit-branch”的头部快照:
      spark.read.option(SparkReadOptions.BRANCH, "audit-branch").format("iceberg").load("path/to/table")
  • 增量读取

    如果需要增量读取表中追加的数据,可通过配置以下两个关键读取选项实现:

    • start-snapshot-id:增量扫描的起始快照ID(排他性,即不包含此快照的数据)。
    • end-snapshot-id:增量扫描的结束快照ID(包含性,即包含此快照的数据),此选项为可选。如果不指定,默认使用表的当前快照。
    spark.read.format("iceberg").option("start-snapshot-id", "10963874102873").option("end-snapshot-id", "63874143573109").load("path/to/table")

表检查

如果需要查看表的历史记录、快照及其他元数据,Iceberg支持通过元数据表实现。

元数据表的标识方式为在原表名后拼接元数据表名称。例如,如果需要读取db.table表的历史记录,可通过db.table.history访问。

  • 历史记录

    可通过以下语句查看表的历史记录:

    SELECT * FROM prod.db.table.history;
  • 元数据日志条目

    如果需要查看表的元数据日志条目,可通过以下语句查询:

    SELECT * from prod.db.table.metadata_log_entries;
  • 快照

    如果需要查看表的所有有效快照,可通过以下语句查询:

    SELECT * FROM prod.db.table.snapshots;
  • 条目

    如果需要查看表当前所有清单文件中数据文件和删除文件的条目,可通过以下语句查询:

    SELECT * FROM prod.db.table.entries;
  • 文件

    如果需要查看表当前的所有数据文件,可通过以下语句查询:

    SELECT * FROM prod.db.table.files;
  • 清单文件

    如果需要查看表当前的文件清单,可通过以下语句查询:

    SELECT * FROM prod.db.table.manifests;

    查询结果中的“partition_summaries”列中的字段对应清单列表中的field_summary结构体,按以下顺序排列:

    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound

    “contains_nan”可能返回“null”,表示该信息在文件元数据中不可用,这通常发生在读取V1版本的表(V1版本不填充contains_nan信息)。

  • 分区

    如果需要查看表当前的分区信息,可通过以下语句查询:

    SELECT * FROM prod.db.table.partitions;

    对于未分区表,partitions表将不包含partitionspec_id字段。

    partitions元数据表展示的是当前快照中包含数据文件或删除文件的分区。但需注意,删除文件并未实际应用,因此在某些情况下,即使分区的所有数据行都被删除文件标记为删除,该分区仍可能会被显示。

  • 位置删除文件

    如果需要查看表当前快照中所有的位置删除文件,可通过以下语句查询:

    SELECT * from prod.db.table.position_deletes;
  • 所有数据文件

    如果需要查看表的所有数据文件及其各自的元数据,可通过以下语句查询:

    SELECT * FROM prod.db.table.all_data_files;
  • 所有删除文件

    如果需要查看表在所有快照中的删除文件及其各自的元数据,可通过以下语句查询:

    SELECT * FROM prod.db.table.all_delete_files;
  • 所有条目

    如果需要查看表在所有快照中数据文件和删除文件的清单条目,可通过以下语句查询:

    SELECT * FROM prod.db.db.table.all_entries;
  • 所有清单文件

    如果需要查看表的所有清单文件,可通过以下语句查询:

    SELECT * FROM prod.db.table.all_manifests;

    查询结果中“partition_summaries”列中的字段对应清单列表中的field_summary结构体,按以下顺序排列:

    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound

    “contains_nan”可能返回“null”,表示该信息在文件元数据中不可用,通常发生在读取V1版本的表(V1版本不填充contains_nan信息)。

  • 引用

    如果需要查看表的已知快照引用,可通过以下语句查询:

    SELECT * FROM prod.db.table.refs;
  • 通过DataFrame检查元数据

    可通过DataFrameReader API加载元数据表:

    • 示例一:
      spark.read.format("iceberg").load("db.table.files");
    • 示例二:
      spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files");
  • 元数据表的时间旅行

    可结合时间旅行功能检查表在特定时间点的元数据:

    • 示例一:
      SELECT * FROM prod.db.table.manifests TIMESTAMP AS OF '2021-09-20 08:00:00';
    • 示例二:
      SELECT * FROM prod.db.table.partitions VERSION AS OF 10963874102873;
    通过DataFrameReader API对元数据表使用时间旅行:
    spark.read.format("iceberg").option("snapshot-id", 10963874102873L).load("db.table.files")

相关文档