更新时间:2024-03-12 GMT+08:00

设置并行度

操作场景

并行度控制任务的数量,影响操作后数据被切分成的块数。调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到更优。

查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力。

操作步骤

任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

  • 算子层次
    一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来指定,例如
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new LineSplitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1).setParallelism(5);
    
    wordCounts.print();
    
    env.execute("Word Count Example");
  • 执行环境层次

    Flink程序运行在执行环境中。执行环境为所有执行的算子、数据源、data sink定义了一个默认的并行度。

    执行环境的默认并行度可以通过调用setParallelism()方法指定。例如:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = [...]
    wordCounts.print();
    env.execute("Word Count Example");
  • 客户端层次
    并行度可以在客户端将job提交到Flink时设定。对于CLI客户端,可以通过“-p”参数指定并行度。例如:
    ./bin/flink run -p 10 ../examples/*WordCount-java*.jar
  • 系统层次

    在系统级可以通过修改Flink客户端conf目录下的“flink-conf.yaml”文件中的“parallelism.default”配置选项来指定所有执行环境的默认并行度。