Help Center/
MapReduce Service/
Developer Guide (Normal_3.x)/
Spark2x Development Guide (Normal Mode)/
Developing Spark Applications/
Using Spark to Execute the Hudi Sample Project/
Using Spark to Execute the Hudi Sample Project (Scala)
Updated on 2024-10-23 GMT+08:00
Using Spark to Execute the Hudi Sample Project (Scala)
The following code snippets are used as an example. For complete code, see com.huawei.bigdata.hudi.examples.HoodieDataSourceExample.
Insert data:
def insertData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val commitTime: String = System.currentTimeMillis().toString val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)) spark.sparkContext.parallelize(inserts, 2) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(tablePath)}
Query data.
def queryData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val roViewDF = spark. read. format("org.apache.hudi"). load(tablePath + "/*/*/*/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() // +-----------------+-------------------+-------------------+---+ // | fare| begin_lon| begin_lat| ts| // +-----------------+-------------------+-------------------+---+ // |98.88075495133515|0.39556048623031603|0.17851135255091155|0.0| // ... spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ // |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ // | 20191231181501|31cafb9f-0196-4b1...| 2020/01/02|rider-1577787297889|driver-1577787297889| 98.88075495133515| // ... }
Update data:
def updateData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val commitTime: String = System.currentTimeMillis().toString val updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(tablePath)}
Incremental query:
def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String) { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) val incViewDF = spark. read. format("org.apache.hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(tablePath) incViewDF.createOrReplaceTempView("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()}
Query at a specific time point:
def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String) { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val incViewDF = spark.read.format("org.apache.hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(tablePath) incViewDF.createOrReplaceTempView("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()}
Parent topic: Using Spark to Execute the Hudi Sample Project
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot