更新时间:2025-11-26 GMT+08:00
分享

使用Spark Shell创建Hudi表

操作场景

本章节主要介绍了如何通过spark-shell使用Hudi功能。

使用Spark数据源,通过代码段展示如何插入和更新Hudi的默认存储类型数据集COW表,以及每次写操作之后如何读取快照和增量数据。

前提条件

如果集群已开启Kerberos认证,需在Manager界面创建1个人机用户并关联到hadoop和hive用户组,主组为hadoop。

约束与限制

本章节仅适用于MRS 3.3.1-LTS及之前版本。

操作步骤

  1. 下载并安装Hudi客户端,具体请参考安装MRS客户端章节。

    目前Hudi集成在Spark/Spark2x服务中,用户从Manager页面下载Spark/Spark2x客户端即可,例如客户端安装目录为:“/opt/client”。

  2. 使用客户端安装用户登录客户端节点,执行如下命令:

    进入客户端目录:
    cd /opt/hadoopclient

    执行以下命令加载环境变量:

    source bigdata_env
    source Hudi/component_env

    安全认证:

    kinit 创建的业务用户
    • 新创建的用户首次认证需要修改密码。
    • 普通模式(未开启kerberos认证)集群无需执行kinit命令。

  3. 执行spark-shell --master yarn-client命令进入spark-shell,然后引入Hudi相关软件包并生成测试数据。

    • 引入需要的包。
      import org.apache.hudi.QuickstartUtils._
      import scala.collection.JavaConversions._
      import org.apache.spark.sql.SaveMode._
      import org.apache.hudi.DataSourceReadOptions._
      import org.apache.hudi.DataSourceWriteOptions._
      import org.apache.hudi.config.HoodieWriteConfig._
    • 定义表名,存储路径,生成测试数据。
      val tableName = "hudi_cow_table"
      val basePath = "hdfs://hacluster/tmp/hudi_cow_table"
      val dataGen = new DataGenerator
      val inserts = convertToStringList(dataGen.generateInserts(10))
      val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  4. 执行以下命令写入Hudi表,模式为OVERWRITE。

    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Overwrite).
    save(basePath)

  5. 执行以下命令注册临时表并查询。

    val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
    roViewDF.createOrReplaceTempView("hudi_ro_table")
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_ro_table where fare > 20.0").show()

  6. 执行以下命令生成更新数据并更新Hudi表,模式为APPEND。

    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath)

  7. 查询Hudi表增量数据。

    • 重新加载:
      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")
    • 进行增量查询:
      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
      val beginTime = commits(commits.length - 2)
      val incViewDF = spark.read.format("org.apache.hudi").
      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      load(basePath);
      incViewDF.registerTempTable("hudi_incr_table")
      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show()

  8. 进行指定时间点提交的查询。

    val beginTime = "000"
    val endTime = commits(commits.length - 2)
    val incViewDF = spark.read.format("org.apache.hudi").
    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    option(END_INSTANTTIME_OPT_KEY, endTime).
    load(basePath);
    incViewDF.registerTempTable("hudi_incr_table")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show()

  9. 删除测试数据。

    • 准备删除的数据。
      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")
      val deletes = dataGen.generateDeletes(df.collectAsList())
    • 执行删除操作。

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));

      df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY,"delete").
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath);
    • 重新查询数据。
      val roViewDFAfterDelete = spark.read.format("org.apache.hudi").
      load(basePath + "/*/*/*/*")
      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
      spark.sql("select uuid, partitionPath from hudi_ro_table").show()

相关文档