Help Center/ MapReduce Service/ Component Operation Guide (Normal)/ Using Hudi/ Creating a Hudi Table Using Spark Shell
Updated on 2024-12-11 GMT+08:00

Creating a Hudi Table Using Spark Shell

Scenario

This section describes capabilities of Hudi using spark-shell. Using the Spark data source, this section describes how to insert and update a Hudi dataset of the default storage mode Copy-on Write (COW) tables based on code snippets. After each write operation, you will be introduced how to read snapshot and incremental data.

Prerequisites

  • You have created a user and added the user to user groups hadoop (primary group) and hive on Manager.

Procedure

  1. Download and install the Hudi client. For details, see Installing a Client (MRS 3.x or Later).

    Currently, Hudi is integrated in Spark2x. You only need to download the Spark2x client on Manager. For example, the client installation directory is /opt/client.

  2. Log in to the node where the client is installed as user root and run the following command:

    cd /opt/client

  3. Run the following commands to load environment variables:

    source bigdata_env

    source Hudi/component_env

    kinit Created user

    • You need to change the password of the created user, and then run the kinit command to log in to the system again.
    • In normal mode (Kerberos authentication disabled), you do not need to run the kinit command.
    • If multiple services are installed, run the component_env command of the source Spark and then the component_env command of the source Hudi after you run the source bigdata_env command.

  4. Use spark-shell --master yarn-client to import Hudi packages to generate test data:

    • Import required packages.

      import org.apache.hudi.QuickstartUtils._

      import scala.collection.JavaConversions._

      import org.apache.spark.sql.SaveMode._

      import org.apache.hudi.DataSourceReadOptions._

      import org.apache.hudi.DataSourceWriteOptions._

      import org.apache.hudi.config.HoodieWriteConfig._

    • Define the table name and storage path to generate test data.

      val tableName = "hudi_cow_table"

      val basePath = "hdfs://hacluster/tmp/hudi_cow_table"

      val dataGen = new DataGenerator

      val inserts = convertToStringList(dataGen.generateInserts(10))

      val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  5. Write data to the Hudi table in overwrite mode.

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Overwrite).

    save(basePath)

  6. Query the Hudi table.

    Register a temporary table and query the table.

    val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")

    roViewDF.createOrReplaceTempView("hudi_ro_table")

    spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()

  7. Generate new data and update the Hudi table in append mode.

    val updates = convertToStringList(dataGen.generateUpdates(10))

    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Append).

    save(basePath)

  8. Query incremental data in the Hudi table.

    • Reloading data

      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")

    • Perform an incremental query.

      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)

      val beginTime = commits(commits.length - 2)

      val incViewDF = spark.

      read.

      format("org.apache.hudi").

      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

      load(basePath);

      incViewDF.registerTempTable("hudi_incr_table")

      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  9. Perform the point-in-time query.

    val beginTime = "000"

    val endTime = commits(commits.length - 2)

    val incViewDF = spark.read.format("org.apache.hudi").

    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

    option(END_INSTANTTIME_OPT_KEY, endTime).

    load(basePath);

    incViewDF.registerTempTable("hudi_incr_table")

    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  10. Delete data.

    • Prepare the data to delete.

      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")

      val deletes = dataGen.generateDeletes(df.collectAsList())

    • Delete the data.

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));

      df.write.format("org.apache.hudi").

      options(getQuickstartWriteConfigs).

      option(OPERATION_OPT_KEY,"delete").

      option(PRECOMBINE_FIELD_OPT_KEY, "ts").

      option(RECORDKEY_FIELD_OPT_KEY, "uuid").

      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

      option(TABLE_NAME, tableName).

      mode(Append).

      save(basePath);

    • Query data again.

      val roViewDFAfterDelete = spark.

      read.

      format("org.apache.hudi").

      load(basePath + "/*/*/*/*")

      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")

      spark.sql("select uuid, partitionPath from hudi_ro_table").show()