User-defined Partitioner
Compile a user-defined partitioner class that inherits BulkInsertPartitioner and add the following configuration when writing data to Hudi:
.option(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, <User-defined partitioner class package name + class name>)
Example of the user-defined partitioner:
public class HoodieSortExample<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
JavaPairRDD<String,
HoodieRecord<T>> stringHoodieRecordJavaPairRDD = records.coalesce(outputSparkPartitions)
.mapToPair(record -> new Tuple2<>(new StringBuilder().append(record.getPartitionPath())
.append("+")
.append(record.getRecordKey())
.toString(), record));
JavaRDD<HoodieRecord<T>> hoodieRecordJavaRDD = stringHoodieRecordJavaPairRDD.mapPartitions(partition -> {
List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
for (; partition.hasNext();) {
recordList.add(partition.next());
}
Collections.sort(recordList, (o1, o2) -> {
if (o1._1().split("[+]")[0] == o2._1().split("[+]")[0]) {
return Integer.parseInt(o1._1().split("[+]")[1]) - Integer.parseInt(o2._1().split("[+]")[1]);
} else {
return o1._1().split("[+]")[0].compareTo(o2._1().split("[+]")[0]);
}
});
return recordList.stream().map(e -> e._2).iterator();
});
return hoodieRecordJavaRDD;
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.