更新时间:2024-06-27 GMT+08:00
自定义排序器
编写自定义排序类继承BulkInsertPartitioner,在写入Hudi时加入配置:
.option(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, <自定义排序类的包名加类名>)
自定义分区排序器样例:
public class HoodieSortExample<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
JavaPairRDD<String,
HoodieRecord<T>> stringHoodieRecordJavaPairRDD = records.coalesce(outputSparkPartitions)
.mapToPair(record -> new Tuple2<>(new StringBuilder().append(record.getPartitionPath())
.append("+")
.append(record.getRecordKey())
.toString(), record));
JavaRDD<HoodieRecord<T>> hoodieRecordJavaRDD = stringHoodieRecordJavaPairRDD.mapPartitions(partition -> {
List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
for (; partition.hasNext();) {
recordList.add(partition.next());
}
Collections.sort(recordList, (o1, o2) -> {
if (o1._1().split("[+]")[0] == o2._1().split("[+]")[0]) {
return Integer.parseInt(o1._1().split("[+]")[1]) - Integer.parseInt(o2._1().split("[+]")[1]);
} else {
return o1._1().split("[+]")[0].compareTo(o2._1().split("[+]")[0]);
}
});
return recordList.stream().map(e -> e._2).iterator();
});
return hoodieRecordJavaRDD;
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}
父主题: Hudi的自定义配置项样例程序