从零开始使用Iceberg
ApacheIceberg是一种面向海量分析数据集的开放表格式,通过高性能表格式为Spark、HetuEngine和Flink等计算引擎添加表功能,使用方式与SQL表完全一致。
Iceberg专为超大规模表而构建,已在实际业务环境中得到应用,单个表可容纳数十PB数据,且即使如此庞大的表也无需分布式SQL引擎即可读取。支持以下功能:
- 快速扫描规划:读取表或查找文件操作无需分布式SQL引擎即可完成。
- 高级过滤:通过分区和列级统计信息,利用表元数据对数据文件进行裁剪。
- Iceberg专为解决最终一致性云对象存储中的正确性问题而设计。兼容所有云存储,在HDFS中通过避免列表操作和重命名来减少NameNode拥塞。
- 可序列化隔离:表变更具有原子性,部分或未提交的变更对读取者不可见。
- 多个并发写入器使用乐观并发控制,即使发生写入冲突也会重试以确保兼容的更新操作成功完成。
Iceberg当前处于公测阶段,如需使用请提交工单申请开通。
前提条件
已安装Spark组件。
操作步骤
- 安装客户端,具体请参考安装客户端章节。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录(例如“/opt/client”)。
cd /opt/client - 执行如下命令初始化环境变量
source bigdata_env
- 集群已启用Kerberos认证(安全模式)需执行以下命令完成Kerberos认证,集群未启用Kerberos认证(普通模式)请跳过此操作。
kinit MRS集群用户 - 启动Iceberg。
- 需要在Hive MetaStore注册表,使用Spark默认的“spark_catalog”,创建表和写入数据时不需要指定Catalog:
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.storeAssignmentPolicy=ANSI
- 不需要在Hive MetaStore注册表,使用自定义的Catalog,创建表和写入数据时需要指定Catalog:
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.{自定义catalog名称}=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.{自定义catalog名称}.type=hadoop \ --conf spark.sql.catalog.{自定义catalog名称}.warehouse={自定义HDFS或者OBS路径} \ --conf spark.sql.storeAssignmentPolicy=ANSI
其中:
- “org.apache.iceberg.spark.SparkSessionCatalog”支持Iceberg表和Spark原生支持的表格式。
- “org.apache.iceberg.spark.SparkCatalog”仅支持Iceberg表。
- 需要在Hive MetaStore注册表,使用Spark默认的“spark_catalog”,创建表和写入数据时不需要指定Catalog:
- 创建Iceberg表。
- 设置“spark.sql.catalog.local”为“org.apache.iceberg.spark.SparkCatalog”:
set spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog;
- 设置“spark.sql.catalog.local.type”为“hadoop”:
set spark.sql.catalog.local.type=hadoop;
- 设置“spark.sql.catalog.local.warehouse”为“./warehouse”:
set spark.sql.catalog.local.warehouse=./warehouse;
- 创建Iceberg表:
CREATE TABLE local.db.table (id bigint, data string) USING iceberg;
- 设置“spark.sql.catalog.local”为“org.apache.iceberg.spark.SparkCatalog”:
- 向Iceberg表写入数据:
- 插入数据:
- 方式一:
INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
- 方式二:
- 创建表“source”:
CREATE TABLE IF NOT EXISTS source (id bigint, data string) USING iceberg;
- 向“source”中插入数据:
INSERT INTO source (id, data) VALUES(1, 'a'), (2, 'bc'), (3, 'd');
- 向“local.db.table”中插入数据:
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;
- 创建表“source”:
- 方式一:
- 行级别SQL数据更新:
- 创建表“local.db.target”:
CREATE TABLE IF NOT EXISTS local.db.target (id bigint, count INT) USING iceberg;
- 创建表“updates”:
CREATE TABLE IF NOT EXISTS updates (id bigint, count INT) USING parquet;
- 向“updates”中插入数据:
INSERT INTO updates VALUES (1, 2), (3, 4);
- 执行行级别SQL数据更新:
MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count WHEN NOT MATCHED THEN INSERT *;
- 创建表“local.db.target”:
- 使用写入API编写DataFrame:
INSERT INTO local.db.table SELECT id, data FROM source;
- 插入数据:
- 读取Iceberg表数据。
- 查询数据:
SELECT count(1) as count, data FROM local.db.table GROUP BY data;
- 查看表中所有快照:
DESCRIBE EXTENDED local.db.table;
- 读取DataFrame:
SELECT COUNT(*) FROM local.db.table;
- 查询数据: