更新时间:2024-07-24 GMT+08:00

使用Spark Shell创建Hudi表

操作场景

本指南通过使用spark-shell简要介绍了Hudi功能。使用Spark数据源,将通过代码段展示如何插入和更新Hudi的默认存储类型数据集: COW表。每次写操作之后,还将展示如何读取快照和增量数据。

前提条件

  • 在Manager界面创建用户并添加hadoop和hive用户组,主组加入hadoop。

操作步骤

  1. 下载并安装Hudi客户端,具体请参考安装客户端(3.x及之后版本)章节。

    目前Hudi集成在Spark2x中,用户从Manager页面下载Spark2x客户端即可,例如客户端安装目录为:“/opt/client”。

  2. 使用root登录客户端安装节点,执行如下命令:

    cd /opt/client

  3. 执行source命令加载环境变量:

    source bigdata_env

    source Hudi/component_env

    kinit 创建的用户

    • 新创建的用户需要修改密码,更改密码后重新kinit登录。
    • 普通模式(未开启kerberos认证)无需执行kinit命令。
    • 多服务场景下,在source bigdata_env之后,请先source Spark服务的component_env,再去source Hudi的component_env。

  4. 使用spark-shell --master yarn-client,引入Hudi包生成测试数据:

    • 引入需要的包

      import org.apache.hudi.QuickstartUtils._

      import scala.collection.JavaConversions._

      import org.apache.spark.sql.SaveMode._

      import org.apache.hudi.DataSourceReadOptions._

      import org.apache.hudi.DataSourceWriteOptions._

      import org.apache.hudi.config.HoodieWriteConfig._

    • 定义表名,存储路径,生成测试数据

      val tableName = "hudi_cow_table"

      val basePath = "hdfs://hacluster/tmp/hudi_cow_table"

      val dataGen = new DataGenerator

      val inserts = convertToStringList(dataGen.generateInserts(10))

      val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  5. 写入Hudi表,模式为OVERWRITE。

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Overwrite).

    save(basePath)

  6. 查询Hudi表。

    注册临时表并查询:

    val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")

    roViewDF.createOrReplaceTempView("hudi_ro_table")

    spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()

  7. 生成更新数据并更新Hudi表,模式为APPEND。

    val updates = convertToStringList(dataGen.generateUpdates(10))

    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Append).

    save(basePath)

  8. 查询Hudi表增量数据。

    • 重新加载:

      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")

    • 进行增量查询:

      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)

      val beginTime = commits(commits.length - 2)

      val incViewDF = spark.

      read.

      format("org.apache.hudi").

      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

      load(basePath);

      incViewDF.registerTempTable("hudi_incr_table")

      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  9. 进行指定时间点提交的查询。

    val beginTime = "000"

    val endTime = commits(commits.length - 2)

    val incViewDF = spark.read.format("org.apache.hudi").

    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

    option(END_INSTANTTIME_OPT_KEY, endTime).

    load(basePath);

    incViewDF.registerTempTable("hudi_incr_table")

    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  10. 删除数据。

    • 准备删除的数据

      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")

      val deletes = dataGen.generateDeletes(df.collectAsList())

    • 执行删除操作

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));

      df.write.format("org.apache.hudi").

      options(getQuickstartWriteConfigs).

      option(OPERATION_OPT_KEY,"delete").

      option(PRECOMBINE_FIELD_OPT_KEY, "ts").

      option(RECORDKEY_FIELD_OPT_KEY, "uuid").

      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

      option(TABLE_NAME, tableName).

      mode(Append).

      save(basePath);

    • 重新查询数据

      val roViewDFAfterDelete = spark.

      read.

      format("org.apache.hudi").

      load(basePath + "/*/*/*/*")

      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")

      spark.sql("select uuid, partitionPath from hudi_ro_table").show()