更新时间:2022-12-14 GMT+08:00

设计分区方法

操作场景

合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。

操作步骤

以下是几种分区方法。

  • 随机分区:将元素随机地进行分区。
    dataStream.shuffle();
  • Rebalancing (Round-robin partitioning):基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。
    dataStream.rebalance();
  • Rescaling:以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全的rebalance 介入(引入`rebalance()`),这会非常有用。
    dataStream.rescale();
  • 广播:广播每个元素到所有分区。
    dataStream.broadcast();
  • 自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。
    简单示例如下所示:
    // fromElements构造简单的Tuple2流
    DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
    
    // 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
    Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
        @Override
        public int partition(Tuple2<String, Integer> key, int numPartitions) {
            return (key.f0.length() + key.f1) % numPartitions;
        }
    };
    
    // 使用Tuple2进行分区的key值
    dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
            return value;
        }
    }).print();