更新时间:2025-12-10 GMT+08:00
分享

从零开始使用Iceberg

ApacheIceberg是一种面向海量分析数据集的开放表格式,通过高性能表格式为Spark、HetuEngine和Flink等计算引擎添加表功能,使用方式与SQL表完全一致。

Iceberg专为超大规模表而构建,已在实际业务环境中得到应用,单个表可容纳数十PB数据,且即使如此庞大的表也无需分布式SQL引擎即可读取。支持以下功能:

  • 快速扫描规划:读取表或查找文件操作无需分布式SQL引擎即可完成。
  • 高级过滤:通过分区和列级统计信息,利用表元数据对数据文件进行裁剪。
  • Iceberg专为解决最终一致性云对象存储中的正确性问题而设计。兼容所有云存储,在HDFS中通过避免列表操作和重命名来减少NameNode拥塞。
  • 可序列化隔离:表变更具有原子性,部分或未提交的变更对读取者不可见。
  • 多个并发写入器使用乐观并发控制,即使发生写入冲突也会重试以确保兼容的更新操作成功完成。

Iceberg当前处于公测阶段,如需使用请提交工单申请开通。

前提条件

已安装Spark组件。

操作步骤

  1. 安装客户端,具体请参考安装客户端章节。
  2. 以客户端安装用户,登录安装客户端的节点。
  3. 执行以下命令,切换到客户端安装目录(例如“/opt/client”)。

    cd /opt/client

  4. 执行如下命令初始化环境变量

    source bigdata_env

  5. 集群已启用Kerberos认证(安全模式)需执行以下命令完成Kerberos认证,集群未启用Kerberos认证(普通模式)请跳过此操作。

    kinit MRS集群用户

  6. 启动Iceberg。

    • 需要在Hive MetaStore注册表,使用Spark默认的“spark_catalog”,创建表和写入数据时不需要指定Catalog:
      spark-sql \
      --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
      --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
      --conf spark.sql.catalog.spark_catalog.type=hive \
      --conf spark.sql.storeAssignmentPolicy=ANSI
    • 不需要在Hive MetaStore注册表,使用自定义的Catalog,创建表和写入数据时需要指定Catalog:
      spark-sql \
      --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
      --conf spark.sql.catalog.{自定义catalog名称}=org.apache.iceberg.spark.SparkCatalog \
      --conf spark.sql.catalog.{自定义catalog名称}.type=hadoop \
      --conf spark.sql.catalog.{自定义catalog名称}.warehouse={自定义HDFS或者OBS路径} \
      --conf spark.sql.storeAssignmentPolicy=ANSI

    其中:

    • “org.apache.iceberg.spark.SparkSessionCatalog”支持Iceberg表和Spark原生支持的表格式。
    • “org.apache.iceberg.spark.SparkCatalog”仅支持Iceberg表。

  7. 创建Iceberg表。

    1. 设置“spark.sql.catalog.local”为“org.apache.iceberg.spark.SparkCatalog”:
      set spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog;
    2. 设置“spark.sql.catalog.local.type”为“hadoop”:
      set spark.sql.catalog.local.type=hadoop;
    3. 设置“spark.sql.catalog.local.warehouse”为“./warehouse”:
      set spark.sql.catalog.local.warehouse=./warehouse;
    4. 创建Iceberg表:
      CREATE TABLE local.db.table (id bigint, data string) USING iceberg;

  8. 向Iceberg表写入数据:

    • 插入数据:
      • 方式一:
        INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
      • 方式二:
        1. 创建表“source”:
          CREATE TABLE IF NOT EXISTS source (id bigint, data string) USING iceberg;
        2. 向“source”中插入数据:
          INSERT INTO source (id, data) VALUES(1, 'a'), (2, 'bc'), (3, 'd');
        3. 向“local.db.table”中插入数据:
          INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;
    • 行级别SQL数据更新:
      1. 创建表“local.db.target”:
        CREATE TABLE IF NOT EXISTS local.db.target (id bigint,  count INT) USING iceberg;
      2. 创建表“updates”:
        CREATE TABLE IF NOT EXISTS updates (id bigint,  count INT)  USING parquet;
      3. 向“updates”中插入数据:
        INSERT INTO updates VALUES (1, 2), (3, 4);
      4. 执行行级别SQL数据更新:
        MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id
        WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
        WHEN NOT MATCHED THEN INSERT *;
    • 使用写入API编写DataFrame:
      INSERT INTO local.db.table  SELECT id, data FROM source;

  9. 读取Iceberg表数据。

    • 查询数据:
      SELECT count(1) as count, data FROM local.db.table GROUP BY data;
    • 查看表中所有快照:
      DESCRIBE EXTENDED local.db.table;
    • 读取DataFrame:
      SELECT COUNT(*) FROM local.db.table;

相关文档