更新时间:2024-12-11 GMT+08:00

使用Spark Shell创建Hudi表

本章节仅适用于MRS 3.3.1-LTS及之前版本。

操作场景

本章节主要介绍了如何通过spark-shell使用Hudi功能。

使用Spark数据源,通过代码段展示如何插入和更新Hudi的默认存储类型数据集COW表,以及每次写操作之后如何读取快照和增量数据。

前提条件

  • 已下载并安装Hudi客户端,目前Hudi集成在MRS集群的Spark/Spark2x服务中,用户从Manager页面下载包含Spark/Spark2x服务的客户端即可,例如客户端安装目录为“/opt/hadoopclient”。
  • 如果集群已开启Kerberos认证,已在Manager界面创建1个人机用户并关联到hadoop和hive用户组,主组为hadoop。

操作步骤

  1. 下载并安装Hudi客户端,具体请参考安装MRS客户端章节。
  2. 使用客户端安装用户登录客户端节点,执行如下命令进入客户端目录。

    cd /opt/hadoopclient

  3. 执行以下命令加载环境变量。

    source bigdata_env

    source Hudi/component_env

    kinit 创建的业务用户

    • 新创建的用户首次认证需要修改密码。
    • 普通模式(未开启kerberos认证)集群无需执行kinit命令。

  4. 执行spark-shell --master yarn-client命令进入spark-shell,然后引入Hudi相关软件包并生成测试数据。

    • 引入需要的包。

      import org.apache.hudi.QuickstartUtils._

      import scala.collection.JavaConversions._

      import org.apache.spark.sql.SaveMode._

      import org.apache.hudi.DataSourceReadOptions._

      import org.apache.hudi.DataSourceWriteOptions._

      import org.apache.hudi.config.HoodieWriteConfig._

    • 定义表名,存储路径,生成测试数据。

      val tableName = "hudi_cow_table"

      val basePath = "hdfs://hacluster/tmp/hudi_cow_table"

      val dataGen = new DataGenerator

      val inserts = convertToStringList(dataGen.generateInserts(10))

      val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  5. 执行以下命令写入Hudi表,模式为OVERWRITE。

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Overwrite).

    save(basePath)

  6. 执行以下命令注册临时表并查询。

    val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")

    roViewDF.createOrReplaceTempView("hudi_ro_table")

    spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()

  7. 执行以下命令生成更新数据并更新Hudi表,模式为APPEND。

    val updates = convertToStringList(dataGen.generateUpdates(10))

    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Append).

    save(basePath)

  8. 查询Hudi表增量数据。

    • 重新加载:

      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")

    • 进行增量查询:

      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)

      val beginTime = commits(commits.length - 2)

      val incViewDF = spark.read.format("org.apache.hudi").

      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

      load(basePath);

      incViewDF.registerTempTable("hudi_incr_table")

      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  9. 进行指定时间点提交的查询。

    val beginTime = "000"

    val endTime = commits(commits.length - 2)

    val incViewDF = spark.read.format("org.apache.hudi").

    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

    option(END_INSTANTTIME_OPT_KEY, endTime).

    load(basePath);

    incViewDF.registerTempTable("hudi_incr_table")

    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  10. 删除测试数据。

    • 准备删除的数据。

      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")

      val deletes = dataGen.generateDeletes(df.collectAsList())

    • 执行删除操作。

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));

      df.write.format("org.apache.hudi").

      options(getQuickstartWriteConfigs).

      option(OPERATION_OPT_KEY,"delete").

      option(PRECOMBINE_FIELD_OPT_KEY, "ts").

      option(RECORDKEY_FIELD_OPT_KEY, "uuid").

      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

      option(TABLE_NAME, tableName).

      mode(Append).

      save(basePath);

    • 重新查询数据。

      val roViewDFAfterDelete = spark.read.format("org.apache.hudi").

      load(basePath + "/*/*/*/*")

      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")

      spark.sql("select uuid, partitionPath from hudi_ro_table").show()