文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(普通模式)/
开发Spark应用/
使用Spark执行Hudi样例程序/
使用Spark执行Hudi样例程序(Scala)
更新时间:2024-06-27 GMT+08:00
使用Spark执行Hudi样例程序(Scala)
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.hudi.examples.HoodieDataSourceExample。
插入数据:
def insertData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val commitTime: String = System.currentTimeMillis().toString val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)) spark.sparkContext.parallelize(inserts, 2) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(tablePath)}
查询数据:
def queryData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val roViewDF = spark. read. format("org.apache.hudi"). load(tablePath + "/*/*/*/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() // +-----------------+-------------------+-------------------+---+ // | fare| begin_lon| begin_lat| ts| // +-----------------+-------------------+-------------------+---+ // |98.88075495133515|0.39556048623031603|0.17851135255091155|0.0| // ... spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ // |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ // | 20191231181501|31cafb9f-0196-4b1...| 2020/01/02|rider-1577787297889|driver-1577787297889| 98.88075495133515| // ... }
更新数据:
def updateData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { val commitTime: String = System.currentTimeMillis().toString val updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(tablePath)}
增量查询:
def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String) { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) val incViewDF = spark. read. format("org.apache.hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(tablePath) incViewDF.createOrReplaceTempView("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()}
特定时间点查询:
def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String) { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val incViewDF = spark.read.format("org.apache.hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(tablePath) incViewDF.createOrReplaceTempView("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()}
父主题: 使用Spark执行Hudi样例程序