Help Center/
MapReduce Service/
Developer Guide (LTS)/
Spark2x Development Guide (Security Mode)/
Developing Spark Applications/
Using Spark to Execute the Hudi Sample Project/
Using Spark to Execute the Hudi Sample Project (Java)
Updated on 2024-08-10 GMT+08:00
Using Spark to Execute the Hudi Sample Project (Java)
The following code snippets are used as an example. For complete code, see com.huawei.bigdata.hudi.examples.HoodieWriteClientExample.
Create a client object to operate Hudi:
String tablePath = args[0]; String tableName = args[1]; SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Generator of some records to be loaded in. HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); // initialize the table, if not done already Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); if (!fs.exists(path)) { HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); } // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
Insert data:
String newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
Update data:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); records.addAll(toBeUpdated); recordsSoFar.addAll(toBeUpdated); writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime);
Delete data:
newCommitTime = client.startCommit(); LOG.info("Starting commit " + newCommitTime); // just delete half of the records int numToDelete = recordsSoFar.size() / 2; List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1); client.delete(deleteRecords, newCommitTime);
Compress data.
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { Option<String> instant = client.scheduleCompaction(Option.empty()); JavaRDD<WriteStatus> writeStatues = client.compact(instant.get()); client.commitCompaction(instant.get(), writeStatues, Option.empty()); }
Parent topic: Using Spark to Execute the Hudi Sample Project
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot