Python Sample Code
Using Python to Write Data to a Hudi Table
The following code snippets are used as an example. For complete code, see sparknormal-examples.SparkOnHudiPythonExample.hudi_python_write_example.
#insert inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2,' hoodie.insert.shuffle.parallelism': 2 } df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath)
Query data.
tripsSnapshotDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
Update data:
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(basePath)
Query incremental data:
spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*"). \ createOrReplaceTempView("hudi_trips_snapshot") incremental_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': beginTime, } tripsIncrementalDF = spark.read.format("hudi"). \ options(**incremental_read_options). \ load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
Query data at a specific time point:
# Represents all commits > this time. beginTime = "000" endTime = commits[len(commits) - 2] point_in_time_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.end.instanttime': endTime, 'hoodie.datasource.read.begin.instanttime': beginTime } tripsPointInTimeDF = spark.read.format("hudi"). \ options(**point_in_time_read_options). \ load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
Delete data:
# Obtain the total number of records. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() # Obtain two records to be deleted. ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) # Delete the records. hudi_delete_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'delete', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } from pyspark.sql.functions import lit deletes = list(map(lambda row: (row[0], row[1]), ds.collect())) df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0)) df.write.format("hudi"). \ options(**hudi_delete_options). \ mode("append"). \ save(basePath) # Perform the query in the same way. roAfterDeleteViewDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") # Return (total - 2) records. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.