更新时间:2026-06-11 GMT+08:00
从零开始使用Iceberg
Apache Iceberg是一种面向海量分析数据集的开放表格式,通过高性能表格式为Spark、HetuEngine和Flink等计算引擎添加表功能,使用方式与SQL表完全一致。
Iceberg专为超大规模表而构建,已在实际业务环境中得到应用,单个表可容纳数十PB数据,且即使如此庞大的表也无需分布式SQL引擎即可读取。支持以下功能:
- 快速扫描规划:读取表或查找文件操作无需分布式SQL引擎即可完成。
- 高级过滤:通过分区和列级统计信息,利用表元数据对数据文件进行裁剪。
- Iceberg专为解决最终一致性云对象存储中的正确性问题而设计。兼容所有云存储,在HDFS中通过避免列表操作和重命名来减少NameNode拥塞。
- 可序列化隔离:表变更具有原子性,部分或未提交的变更对读取者不可见。
- 多个并发写入器使用乐观并发控制,即使发生写入冲突也会重试以确保兼容的更新操作成功完成。
前提条件
- 已安装Spark组件。
- 创建Catalog时,如果需要在Hive MetaStore注册表,则需集群已安装Hive组件。
操作步骤
- 安装客户端,具体请参考安装客户端章节。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd 客户端安装目录
- 执行如下命令初始化环境变量
source bigdata_env source Spark/component_env
- 集群已启用Kerberos认证(安全模式)需执行以下命令完成Kerberos认证,集群未启用Kerberos认证(普通模式)请跳过此操作。
kinit 创建的组件业务用户 - 启动Iceberg。
- 场景一 需要在Hive MetaStore注册表,使用Parquet、Orc等Spark原生支持的表格式和Iceberg表,则可使用SparkSessionCatalog,Catalog名称仅支持指定为“spark_catalog”:
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.storeAssignmentPolicy=ANSI
- 场景二 需要在Hive MetaStore注册表,只使用Iceberg表,则可使用SparkCatalog,Catalog名称可任意指定,例如“prod”:
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.prod=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.prod.type=hive \ --conf spark.sql.storeAssignmentPolicy=ANSI
其中:
- “org.apache.iceberg.spark.SparkSessionCatalog”支持Iceberg表和Spark原生支持的表格式。使用SparkSessionCatalog时,Catalog名称仅支持指定为“spark_catalog”。
- “org.apache.iceberg.spark.SparkCatalog”仅支持Iceberg表。
- 场景一
- 创建Iceberg表。
- 使用场景二的方式启动spark-sql时指定的Catalog,例如“prod”;创建数据库,例如“db”:
create database prod.db; use prod.db;
- 创建Iceberg表:
CREATE TABLE prod.db.table (id bigint, data string) USING iceberg;
- 使用场景二的方式启动spark-sql时指定的Catalog,例如“prod”;创建数据库,例如“db”:
- 向Iceberg表写入数据:
- 插入数据:
- 方式一:
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
- 方式二:
- 创建表“source”:
CREATE TABLE IF NOT EXISTS source (id bigint, data string) USING iceberg;
- 向“source”中插入数据:
INSERT INTO source (id, data) VALUES(1, 'a'), (2, 'bc'), (3, 'd');
- 向“prod.db.table”中插入数据:
INSERT INTO prod.db.table SELECT id, data FROM source WHERE length(data) = 1;
- 创建表“source”:
- 方式一:
- 行级别SQL数据更新:
- 创建表“prod.db.target”:
CREATE TABLE IF NOT EXISTS prod.db.target (id bigint, count INT) USING iceberg;
- 创建表“updates”:
CREATE TABLE IF NOT EXISTS updates (id bigint, count INT) USING parquet;
- 向“updates”中插入数据:
INSERT INTO updates VALUES (1, 2), (3, 4);
- 执行行级别SQL数据更新:
MERGE INTO prod.db.target t USING (SELECT * FROM updates) u ON t.id = u.id WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count WHEN NOT MATCHED THEN INSERT *;
- 创建表“prod.db.target”:
- 选择列插入数据:
INSERT INTO prod.db.table SELECT id, data FROM source;
- 插入数据:
- 读取Iceberg表数据。
- 查询表中数据:
SELECT count(1) as count, data FROM prod.db.table GROUP BY data;
- 查看表中所有元数据信息:
DESCRIBE EXTENDED prod.db.table;
- 读取表的总行数:
SELECT COUNT(*) FROM prod.db.table;
- 查询表中数据: