文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(安全模式)/
开发Spark应用/
使用Spark执行Hudi样例程序/
使用Spark执行Hudi样例程序(Java)
更新时间:2024-08-03 GMT+08:00
使用Spark执行Hudi样例程序(Java)
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.hudi.examples.HoodieWriteClientExample
创建客户端对象来操作Hudi:
String tablePath = args[0]; String tableName = args[1]; SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Generator of some records to be loaded in. HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); // initialize the table, if not done already Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); if (!fs.exists(path)) { HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); } // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
插入数据:
String newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
更新数据:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); records.addAll(toBeUpdated); recordsSoFar.addAll(toBeUpdated); writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
删除数据:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); // just delete half of the records int numToDelete = recordsSoFar.size() / 2; List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1); client.delete(deleteRecords, newCommitTime);
压缩数据:
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { Option<String> instant = client.scheduleCompaction(Option.empty()); JavaRDD<WriteStatus> writeStatues = client.compact(instant.get()); client.commitCompaction(instant.get(), writeStatues, Option.empty()); }
父主题: 使用Spark执行Hudi样例程序