文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
使用Spark执行Hudi样例程序/
使用Spark执行Hudi样例程序(Python)
更新时间:2024-08-05 GMT+08:00
使用Spark执行Hudi样例程序(Python)
下面代码片段仅为演示,具体代码参见:HudiPythonExample.py。
插入数据:
#insert inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2,' hoodie.insert.shuffle.parallelism': 2 } df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath)
查询数据:
tripsSnapshotDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
更新数据:
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(basePath)
增量查询:
spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*"). \ createOrReplaceTempView("hudi_trips_snapshot") incremental_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': beginTime, } tripsIncrementalDF = spark.read.format("hudi"). \ options(**incremental_read_options). \ load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
特定时间点查询:
# Represents all commits > this time. beginTime = "000" endTime = commits[len(commits) - 2] point_in_time_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.end.instanttime': endTime, 'hoodie.datasource.read.begin.instanttime': beginTime } tripsPointInTimeDF = spark.read.format("hudi"). \ options(**point_in_time_read_options). \ load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
删除数据:
# 获取记录总数 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() # 拿到两条将被删除的记录 ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) # 执行删除 hudi_delete_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'delete', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } from pyspark.sql.functions import lit deletes = list(map(lambda row: (row[0], row[1]), ds.collect())) df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0)) df.write.format("hudi"). \ options(**hudi_delete_options). \ mode("append"). \ save(basePath) # 像之前一样运行查询 roAfterDeleteViewDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") # 应返回 (total - 2) 条记录 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()
父主题: 使用Spark执行Hudi样例程序