Configuring DOP
Scenario
The degree of parallelism (DOP) indicates the number of tasks to be executed concurrently. It determines the number of data blocks after the operation. Configuring the DOP will optimize the number of tasks, data volume of each task, and the host processing capability.
Query the CPU and memory usage. If data and tasks are not evenly distributed among nodes, increase the DOP for even distribution.
Procedure
Configure the DOP at one of the following layers (the priorities of which are in the descending order) based on the actual memory, CPU, data, and application logic conditions:
- Operator
Call the setParallelism() method to specify the DOP of an operator, data source, and sink. For example:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
- Execution environment
Flink runs in the execution environment which defines a default DOP for operators, data source and data sink.
Call the setParallelism() method to specify the default DOP of the execution environment. Example:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
- Client
- System
On the Flink client, modify the parallelism.default parameter in the flink-conf.yaml file under the conf to specify the DOP for all execution environments.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.