使用Spark查询Iceberg表数据
在Spark中使用Iceberg前,需先配置Spark目录,spark_catalog和prod目录配置请参见配置Iceberg Catalog。Iceberg基于Apache Spark的DataSourceV2 API实现数据源和目录功能。
查询表数据
在Spark 3中,表的标识符需包含目录名称。查询表数据操作为:
创建表,例如:
CREATE TABLE prod.db.table (id bigint, data string, category string, ts timestamp, shard int) USING iceberg PARTITIONED BY (category);
- 查询表数据:
SELECT * FROM prod.db.table;
- 对于历史记录、snapshots快照等元数据表,可将Iceberg表名作为命名空间来访问,例如:
SELECT * FROM prod.db.table.files;
将表加载为DataFrame
如果需要将表加载为DataFrame,可使用spark.table方法:
//spark-shell
val df = spark.table("prod.db.table")
结合DataFrameReader使用目录
可通过Spark的DataFrameReader接口加载路径和表名,表的加载方式取决于标识符的指定形式。当使用spark.read.format("iceberg").load(table)或spark.table(table)时,table支持以下多种格式,且优先级按以下顺序排列(例如,匹配到目录时,优先级高于任何命名空间解析):
- file:///path/to/table:从指定路径加载HadoopTable(基于Hadoop存储的Iceberg表)。
- tablename:从当前目录的当前命名空间中加载表,即currentCatalog.currentNamespace.tablename。
- catalog.tablename:从指定目录中加载表。
- namespace.tablename:从当前目录中加载指定命名空间下的表。
- catalog.namespace.tablename:从指定目录中加载指定命名空间下的表。
- namespace1.namespace2.tablename:从当前目录中加载多级命名空间下的表。
上述列表按优先级从高到低排序,例如,如果标识符可匹配到目录,则优先按目录规则加载表,而非进行命名空间解析。
时间旅行
- SQL命令
Spark 3.3及之后版本支持在SQL查询中使用TIMESTAMP AS OF或VERSION AS OF子句实现时间旅行。其中,VERSION AS OF子句可接受长整数类型的快照 ID,或字符串类型的分支/标签名称。
- 时间旅行至1986年10月26日01:21:00:
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
- 时间旅行至快照ID 10963874102873:
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
查询current-snapshot-id快照ID的命令为:
DESCRIBE EXTENDED prod.db.table;
- 时间旅行至“audit-branch”的头部快照:
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
- 时间旅行至标签“historical-snapshot”所引用的快照:
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
还支持FOR SYSTEM_TIME AS OF和FOR SYSTEM_VERSION AS OF语法,功能与TIMESTAMP AS OF和VERSION AS OF一致,例如:
- 示例一:
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00';
- 示例二:
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 10963874102873;
- 示例三:
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'audit-branch';
- 示例四:
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'historical-snapshot';
时间戳也可使用Unix时间戳(秒级) 传入,例如:
SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
或可使用:
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;
分支或标签也可通过类似元数据表的语法指定,格式为“branch_<分支名>”或“tag_<标签名>”:
SELECT * FROM prod.db.table.`branch_audit-branch`;
或可使用:
SELECT * FROM prod.db.table.`tag_historical-snapshot`;
含-等特殊字符的标识符不合法,必须使用反引号(`)转义。 通过“branch_<分支名>”或“tag_<标签名>”定位的分支或标签,不可与VERSION AS OF子句同时使用 。
- 时间旅行至1986年10月26日01:21:00:
- 时间旅行查询中的Schema选择
各类时间旅行查询可根据定位目标的不同,使用快照自身的Schema或当前表的Schema,具体规则如下:
- 使用快照自身的Schema:
- 示例一:
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
- 示例二:
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
- 示例一:
- 使用当前表的Schema:
- 示例一:
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
- 示例二:
SELECT * FROM prod.db.table.`branch_audit-branch`;
- 示例一:
- 使用快照自身的Schema:
- 示例一:
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
- 示例二:
SELECT * FROM prod.db.table.`tag_historical-snapshot`;
- 示例一:
- 使用快照自身的Schema:
- DataFrame
在DataFrame API中,如果需要选择表的特定快照或某个时间点的快照,Iceberg支持以下四种Spark读取选项,用于精准定位目标快照:
- snapshot-id:选择特定快照,需传入快照的ID。
- as-of-timestamp:选择指定时间点的当前快照,需传入毫秒级Unix时间戳。
- branch:选择指定分支的头部快照。
- tag:选择指定标签关联的快照。
例如:
- 时间旅行至1986年10月26日01:21:00:
spark.read.option("as-of-timestamp", "499162860000").format("iceberg").load("path/to/table") - 时间旅行至快照ID 10963874102873:
spark.read.option("snapshot-id", 10963874102873L).format("iceberg").load("path/to/table") - 时间旅行至标签“historical-snapshot”所引用的快照:
spark.read.option(SparkReadOptions.TAG, "historical-snapshot").format("iceberg").load("path/to/table") - 时间旅行至“audit-branch”的头部快照:
spark.read.option(SparkReadOptions.BRANCH, "audit-branch").format("iceberg").load("path/to/table")
- 增量读取
如果需要增量读取表中追加的数据,可通过配置以下两个关键读取选项实现:
- start-snapshot-id:增量扫描的起始快照ID(排他性,即不包含此快照的数据)。
- end-snapshot-id:增量扫描的结束快照ID(包含性,即包含此快照的数据),此选项为可选。如果不指定,默认使用表的当前快照。
spark.read.format("iceberg").option("start-snapshot-id", "10963874102873").option("end-snapshot-id", "63874143573109").load("path/to/table")
表检查
如果需要查看表的历史记录、快照及其他元数据,Iceberg支持通过元数据表实现。
元数据表的标识方式为在原表名后拼接元数据表名称。例如,如果需要读取db.table表的历史记录,可通过db.table.history访问。
- 历史记录
SELECT * FROM prod.db.table.history;
- 元数据日志条目
SELECT * from prod.db.table.metadata_log_entries;
- 快照
SELECT * FROM prod.db.table.snapshots;
- 条目
如果需要查看表当前所有清单文件中数据文件和删除文件的条目,可通过以下语句查询:
SELECT * FROM prod.db.table.entries;
- 文件
SELECT * FROM prod.db.table.files;
- 清单文件
SELECT * FROM prod.db.table.manifests;
查询结果中的“partition_summaries”列中的字段对应清单列表中的field_summary结构体,按以下顺序排列:
- contains_null
- contains_nan
- lower_bound
- upper_bound
“contains_nan”可能返回“null”,表示该信息在文件元数据中不可用,这通常发生在读取V1版本的表(V1版本不填充contains_nan信息)。
- 分区
SELECT * FROM prod.db.table.partitions;
对于未分区表,partitions表将不包含partition和spec_id字段。
partitions元数据表展示的是当前快照中包含数据文件或删除文件的分区。但需注意,删除文件并未实际应用,因此在某些情况下,即使分区的所有数据行都被删除文件标记为删除,该分区仍可能会被显示。
- 位置删除文件
如果需要查看表当前快照中所有的位置删除文件,可通过以下语句查询:
SELECT * from prod.db.table.position_deletes;
- 所有数据文件
如果需要查看表的所有数据文件及其各自的元数据,可通过以下语句查询:
SELECT * FROM prod.db.table.all_data_files;
- 所有删除文件
如果需要查看表在所有快照中的删除文件及其各自的元数据,可通过以下语句查询:
SELECT * FROM prod.db.table.all_delete_files;
- 所有条目
如果需要查看表在所有快照中数据文件和删除文件的清单条目,可通过以下语句查询:
SELECT * FROM prod.db.db.table.all_entries;
- 所有清单文件
SELECT * FROM prod.db.table.all_manifests;
查询结果中“partition_summaries”列中的字段对应清单列表中的field_summary结构体,按以下顺序排列:
- contains_null
- contains_nan
- lower_bound
- upper_bound
“contains_nan”可能返回“null”,表示该信息在文件元数据中不可用,通常发生在读取V1版本的表(V1版本不填充contains_nan信息)。
- 引用
SELECT * FROM prod.db.table.refs;
- 通过DataFrame检查元数据
- 示例一:
spark.read.format("iceberg").load("db.table.files"); - 示例二:
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files");
- 示例一:
- 元数据表的时间旅行
- 示例一:
SELECT * FROM prod.db.table.manifests TIMESTAMP AS OF '2021-09-20 08:00:00';
- 示例二:
SELECT * FROM prod.db.table.partitions VERSION AS OF 10963874102873;
通过DataFrameReader API对元数据表使用时间旅行:spark.read.format("iceberg").option("snapshot-id", 10963874102873L).load("db.table.files") - 示例一: