更新时间:2024-08-05 GMT+08:00
自定义排序器
编写自定义排序类继承BulkInsertPartitioner,在写入Hudi时加入配置:
.option(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, <自定义排序类的包名加类名>)
自定义分区排序器样例:
public class HoodieSortExample<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { JavaPairRDD<String, HoodieRecord<T>> stringHoodieRecordJavaPairRDD = records.coalesce(outputSparkPartitions) .mapToPair(record -> new Tuple2<>(new StringBuilder().append(record.getPartitionPath()) .append("+") .append(record.getRecordKey()) .toString(), record)); JavaRDD<HoodieRecord<T>> hoodieRecordJavaRDD = stringHoodieRecordJavaPairRDD.mapPartitions(partition -> { List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>(); for (; partition.hasNext();) { recordList.add(partition.next()); } Collections.sort(recordList, (o1, o2) -> { if (o1._1().split("[+]")[0] == o2._1().split("[+]")[0]) { return Integer.parseInt(o1._1().split("[+]")[1]) - Integer.parseInt(o2._1().split("[+]")[1]); } else { return o1._1().split("[+]")[0].compareTo(o2._1().split("[+]")[0]); } }); return recordList.stream().map(e -> e._2).iterator(); }); return hoodieRecordJavaRDD; } @Override public boolean arePartitionRecordsSorted() { return true; } }
父主题: Hudi的自定义配置项样例程序