User-defined Partitioner
Compile a user-defined partitioner class that inherits BulkInsertPartitioner and add the following configuration when writing data to Hudi:
.option(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, <User-defined partitioner class package name + class name>)
Example of the user-defined partitioner:
public class HoodieSortExample<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { JavaPairRDD<String, HoodieRecord<T>> stringHoodieRecordJavaPairRDD = records.coalesce(outputSparkPartitions) .mapToPair(record -> new Tuple2<>(new StringBuilder().append(record.getPartitionPath()) .append("+") .append(record.getRecordKey()) .toString(), record)); JavaRDD<HoodieRecord<T>> hoodieRecordJavaRDD = stringHoodieRecordJavaPairRDD.mapPartitions(partition -> { List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>(); for (; partition.hasNext();) { recordList.add(partition.next()); } Collections.sort(recordList, (o1, o2) -> { if (o1._1().split("[+]")[0] == o2._1().split("[+]")[0]) { return Integer.parseInt(o1._1().split("[+]")[1]) - Integer.parseInt(o2._1().split("[+]")[1]); } else { return o1._1().split("[+]")[0].compareTo(o2._1().split("[+]")[0]); } }); return recordList.stream().map(e -> e._2).iterator(); }); return hoodieRecordJavaRDD; } @Override public boolean arePartitionRecordsSorted() { return true; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.