Java Example Code
The following code snippets are used as an example. For complete code, see com.huawei.bigdata.hudi.examples.HoodieWriteClientExample.
Create a client object to operate Hudi:
String tablePath = args[0]; String tableName = args[1]; SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Generator of some records to be loaded in. HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); // initialize the table, if not done already Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); if (!fs.exists(path)) { HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); } // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
Insert data:
String newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
Update data:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); records.addAll(toBeUpdated); recordsSoFar.addAll(toBeUpdated); writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
Delete data:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); // just delete half of the records int numToDelete = recordsSoFar.size() / 2; List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1); client.delete(deleteRecords, newCommitTime);
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { Option<String> instant = client.scheduleCompaction(Option.empty()); JavaRDD<WriteStatus> writeStatues = client.compact(instant.get()); client.commitCompaction(instant.get(), writeStatues, Option.empty()); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.