更新时间:2026-06-11 GMT+08:00
分享

配置Spark客户端同时对接Hudi、Iceberg、Paimon

操作场景

在Spark客户端同时对接Hudi、Iceberg、Paimon组件,使用spark-sql把Hudi表数据迁移到Iceberg表。

前提条件

  • 下载并安装Hudi客户端,目前Hudi、Iceberg、Paimon均集成在Spark中,用户从Manager页面下载Spark客户端即可,例如客户端安装目录为:“/opt/client”。
  • 在Manager界面创建用户并添加hadoop和hive用户组,主组加入hadoop。

约束与限制

  • 本章节仅适用于MRS 3.6.0-LTS.1及之后版本。
  • 由于多个数据湖表格式组件有一定的语法冲突,在使用时对加载数据湖表格式组件的顺序有一定要求,具体体现在spark.sql.extensions配置中,顺序如下。

    同时加载Hudi、Iceberg、Paimon数据湖表格式组件:

    spark.sql.extensions=org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
  • 在客户端中对Iceberg、Paimon表进行操作时,需要明确使用三段式来进行唯一确认表,即:catalog.database.table。

使用spark-sql同时对接Hudi、Iceberg、Paimon

  1. 使用客户端安装用户登录Spark客户端节点,进入客户端安装目录。

    cd 客户端安装目录

    加载环境变量:

    source bigdata_env

    加载组件环境变量:

    source Spark/component_env
    source Hudi/component_env

    安全模式执行以下命令,普通模式无需执行:

    kinit 组件业务用户

    输入密码完成认证(首次登录需要修改密码)

  2. 执行如下命令,使用spark-sql同时对接Hudi、Iceberg、Paimon数据湖表格式组件:

    spark-sql \
    --master yarn \
    --jars {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名},{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \
    --driver-class-path {客户端安装路径}/Spark/spark/jars/paimon/{paimon-spark完整包名}:{客户端安装路径}/Spark/spark/jars/paimon/{paimon-hive-connector-common完整包名} \
    --conf spark.sql.catalog.{paimon_catalog_name}=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.{paimon_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR} \
    --conf spark.sql.catalog.{paimon_catalog_name}.metastore=hive \
    --conf spark.sql.catalog.{iceberg_catalog_name}=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.{iceberg_catalog_name}.warehouse=${HIVE_METASTORE_WAREHOUSE_DIR} \
    --conf spark.sql.catalog.{iceberg_catalog_name}.type=hive \
    --conf spark.sql.storeAssignmentPolicy=ANSI \
    --conf spark.sql.extensions=org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    • 如果涉及Spark写Paimon表,Hive读取,涉及Timestamp类型读写时,只支持TIMESTAMP_NTZ类型的Timestamp跨引擎读写,Spark创建表前需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
    • 如果涉及Spark读取其他引擎写的Iceberg、Paimon表的Timestamp类型字段,需要将spark.sql.session.timeZone设置为与写Iceberg、Paimon表的引擎的时区配置保持一致,否则可能出现结果不一致的情况,如果写入引擎使用的TIMESTAMP_NTZ类型的Timestamp,需将spark.sql.timestampType参数设置为TIMESTAMP_NTZ。
    • 当设置spark.sql.session.timeZone时,Iceberg表数据写入时需要显式指定数据类型。
    • 注意--jars和--driver-class-path后路径的分隔符不同,分别是“,”和“:”。
    • “${HIVE_METASTORE_WAREHOUSE_DIR}”获取方式:登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数hive.metastore.warehouse.dir,即可获取参数值。
    • “${HIVE_METASTORE_URI_DEFAULT}”获取方式:登录Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > MetaStore(角色)”,搜索参数HIVE_METASTORE_URI_DEFAULT,即可获取参数值。

  1. 创建与Hudi表相同的数据结构Iceberg表,例如:

    spark-sql> CREATE TABLE iceberg.default.iceberg_table (id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (data);

  2. 迁移Hudi表数据到新建的Iceberg表,例如:

    spark-sql> insert into iceberg.default.iceberg_table select * from hudi_table;

使用spark-beeline同时对接Hudi、Iceberg、Paimon

  1. 将客户端下的“{客户端安装路径}/Spark/spark/jars/paimon/paimon-spark-3.5*.jar”和“{客户端安装路径}/Spark/spark/jars/paimon/paimon-hive-connector-common*.jar”上传到HDFS或OBS统一归档路径
  2. 登录Manager界面,选择“集群 > 服务 > Spark > 配置 > 全部配置 > JDBCServer(角色) ”,在“custom”中添加表1配置,其次按表2修改配置项,然后重启JDBCserver。

    表1 添加到custom中的配置项

    名称

    spark.jars

    {hdfs或obs统一归档路径}/paimon-spark-3.5*.jar,{hdfs或obs统一归档路径}/paimon-hive-connector-common*.jar

    spark.sql.catalog.{paimon_catalog_name}

    org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.{paimon_catalog_name}.warehouse

    ${HIVE_METASTORE_WAREHOUSE_DIR}

    spark.sql.catalog.{paimon_catalog_name}.metastore

    hive

    spark.sql.catalog.{paimon_catalog_name}.uri

    ${HIVE_METASTORE_URI_DEFAULT}

    spark.sql.catalog.{iceberg_catalog_name}

    org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.{iceberg_catalog_name}.warehouse

    ${HIVE_METASTORE_WAREHOUSE_DIR}

    spark.sql.catalog.{iceberg_catalog_name}.type

    hive

    spark.sql.catalog.{iceberg_catalog_name}.uri

    ${HIVE_METASTORE_URI_DEFAULT}

    表2 修改的配置项

    名称

    默认值值

    修改后的值

    spark.sql.extensions

    org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension

    org.apache.spark.sql.hive.SparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

    spark.sql.storeAssignmentPolicy

    LEGACY

    ANSI

  3. 使用客户端安装用户登录Spark客户端节点,进入客户端安装目录。

    cd 客户端安装目录

    加载环境变量:

    source bigdata_env

    加载组件环境变量:

    source Spark/component_env
    source Hudi/component_env

    安全模式执行以下命令,普通模式无需执行:

    kinit 组件业务用户

    输入密码完成认证(首次登录需要修改密码)

  4. 执行如下命令,连接Spark JDBCServer。

    spark-beeline

  5. 创建表,数据迁移步骤与spark-sql方式中的34相同。

相关文档