Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Hudi/ Creating a Hudi Table Using Spark Shell
Updated on 2024-10-09 GMT+08:00

Creating a Hudi Table Using Spark Shell

Scenario

This topic describes how to use Hudi with the Spark Shell.

The example in this topic uses the Spark data source and provides code snippets to show how to insert and update the COW table of Hudi's default storage data set and how to read snapshots and incremental data after each write operation.

Prerequisites

  • The Hudi client has been downloaded and installed. Currently, the Hudi client is integrated into the Spark/Spark2x service of the MRS cluster. You can download the client that contains the Spark/Spark2x service from FusionInsight Manager. For example, the client installation directory is /opt/hadoopclient.
  • If Kerberos authentication has been enabled for the cluster, ensure that a machine-machine user has been created on FusionInsight Manager and associated with the hadoop (primary group) and hive user groups.

Procedure

  1. Download and install the Hudi client. For details, see Installing a Client.
  2. Log in to the client node as the client installation user and run the following command to go to the client directory:

    cd /opt/hadoopclient

  3. Load environment variables.

    source bigdata_env

    source Hudi/component_env

    kinit Created service user

    • Change the password of the new user upon the first authentication.
    • For clusters in normal mode (Kerberos authentication disabled), you do not need to run the kinit command.

  4. Run the spark-shell --master yarn-client command to enter the Spark Shell, import the Hudi software package, and generate test data.

    • Import required packages.

      import org.apache.hudi.QuickstartUtils._

      import scala.collection.JavaConversions._

      import org.apache.spark.sql.SaveMode._

      import org.apache.hudi.DataSourceReadOptions._

      import org.apache.hudi.DataSourceWriteOptions._

      import org.apache.hudi.config.HoodieWriteConfig._

    • Define the table name and storage path to generate test data.

      val tableName = "hudi_cow_table"

      val basePath = "hdfs://hacluster/tmp/hudi_cow_table"

      val dataGen = new DataGenerator

      val inserts = convertToStringList(dataGen.generateInserts(10))

      val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  5. Run the following command to write data to the Hudi table in OVERWRITE mode:

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Overwrite).

    save(basePath)

  6. Register and query the temporary table.

    val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")

    roViewDF.createOrReplaceTempView("hudi_ro_table")

    spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()

  7. Generate new data and update the Hudi table in APPEND mode.

    val updates = convertToStringList(dataGen.generateUpdates(10))

    val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))

    df.write.format("org.apache.hudi").

    options(getQuickstartWriteConfigs).

    option(PRECOMBINE_FIELD_OPT_KEY, "ts").

    option(RECORDKEY_FIELD_OPT_KEY, "uuid").

    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

    option(TABLE_NAME, tableName).

    mode(Append).

    save(basePath)

  8. Query incremental data in the Hudi table.

    • Reloading data

      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")

    • Perform an incremental query.

      val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)

      val beginTime = commits(commits.length - 2)

      val incViewDF = spark.read.format("org.apache.hudi").

      option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

      load(basePath);

      incViewDF.registerTempTable("hudi_incr_table")

      spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  9. Perform the point-in-time query.

    val beginTime = "000"

    val endTime = commits(commits.length - 2)

    val incViewDF = spark.read.format("org.apache.hudi").

    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).

    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).

    option(END_INSTANTTIME_OPT_KEY, endTime).

    load(basePath);

    incViewDF.registerTempTable("hudi_incr_table")

    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

  10. Delete the test data.

    • Prepare the data to delete.

      val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")

      val deletes = dataGen.generateDeletes(df.collectAsList())

    • Delete the data.

      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));

      df.write.format("org.apache.hudi").

      options(getQuickstartWriteConfigs).

      option(OPERATION_OPT_KEY,"delete").

      option(PRECOMBINE_FIELD_OPT_KEY, "ts").

      option(RECORDKEY_FIELD_OPT_KEY, "uuid").

      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

      option(TABLE_NAME, tableName).

      mode(Append).

      save(basePath);

    • Query data again.

      val roViewDFAfterDelete = spark.read.format("org.apache.hudi").

      load(basePath + "/*/*/*/*")

      roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")

      spark.sql("select uuid, partitionPath from hudi_ro_table").show()