Help Center/
MapReduce Service/
Developer Guide (Normal_3.x)/
Spark2x Development Guide (Normal Mode)/
Developing Spark Applications/
Using Spark to Execute the Hudi Sample Project/
Using Spark to Execute the Hudi Sample Project (Python)
Updated on 2024-10-23 GMT+08:00
Using Spark to Execute the Hudi Sample Project (Python)
Using Python to Write Data to a Hudi Table
The following code snippets are used as an example. For complete code, see sparknormal-examples.SparkOnHudiPythonExample.hudi_python_write_example.
Insert data:
#insert inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2,' hoodie.insert.shuffle.parallelism': 2 } df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath)
Query data.
tripsSnapshotDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
Update data:
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(basePath)
Query incremental data:
spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*"). \ createOrReplaceTempView("hudi_trips_snapshot") incremental_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': beginTime, } tripsIncrementalDF = spark.read.format("hudi"). \ options(**incremental_read_options). \ load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
Query data at a specific time point:
# Represents all commits > this time. beginTime = "000" endTime = commits[len(commits) - 2] point_in_time_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.end.instanttime': endTime, 'hoodie.datasource.read.begin.instanttime': beginTime } tripsPointInTimeDF = spark.read.format("hudi"). \ options(**point_in_time_read_options). \ load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
Delete data:
# Obtain the total number of records. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() # Obtain two records to be deleted. ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) # Delete the records. hudi_delete_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'delete', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } from pyspark.sql.functions import lit deletes = list(map(lambda row: (row[0], row[1]), ds.collect())) df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0)) df.write.format("hudi"). \ options(**hudi_delete_options). \ mode("append"). \ save(basePath) # Perform the query in the same way. roAfterDeleteViewDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") # Return (total - 2) records. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()
Parent topic: Using Spark to Execute the Hudi Sample Project
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot