FlinkSQL Operator Parallelism
This section applies to MRS 3.5.0 or later.
Scenarios
For jobs submitted through CompiledPlan, the parallelism and TTL of operators are subject to the values in CompiledPlan instead of the values in flink-conf.yaml. FlinkSQL allows you to set the parallelism of operators by modifying CompiledPlan of a job.
When you modify CompiledPlan, do not change the JSON file structure, or the job will fail to be submitted. The save path of CompiledPlan can be an HDFS path or an OBS path. In this example, the HDFS path is used.
How to Use
Change the value of table.exec.resource.default-parallelism of the an operator in CompiledPlan.
Example
- Develop a FlinkServer SQL job.
On the SQL development page of FlinkServer, enter the following SQL statements by referring to Creating a FlinkServer Job and click Check Semantic:
set parallelism.default = 2; CREATE TABLE print( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT ) WITH ( 'connector' = 'print' ); CREATE TABLE datagen( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); COMPILE PLAN 'hdfs://hacluster/tmp/plan.json' FOR insert into print select * from datagen;
- View and modify the operator parallelism in the CompiledPlan file.
- Log in to FusionInsight Manager, choose Cluster > Services > HDFS, click the link next to NameNode Web UI, and choose Utilities > Browse the file system to view and obtain the hdfs://hacluster/tmp/plan.json file. Click Check Semantic. The CompiledPlan file is generated in the HDFS. The file content is similar to the following example. The parallelism of the datagen operator (ID: 1) is 2.
- For example, to change the parallelism of the datagen operator to 1, modify "table.exec.resource.default-parallelism": "1" and save, and then upload it to hdfs://hacluster/tmp/plan.json.
{ "flinkVersion" : "1.17", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", "configuration" : { "table.exec.resource.default-parallelism" : "2" }, "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`datagen`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "uuid", "dataType" : "VARCHAR(20)" }, { "name" : "name", "dataType" : "VARCHAR(10)" }, { "name" : "age", "dataType" : "INT" }, { "name" : "ts", "dataType" : "INT" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "datagen", "rows-per-second" : "1" } } } }, "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>", "description" : "TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[uuid, name, age, ts])", "inputProperties" : [ ] }, { "id" : 2, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.resource.default-parallelism" : "2", "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", "table.exec.sink.rowtime-inserter" : "ENABLED", "table.exec.sink.type-length-enforcer" : "IGNORE", "table.exec.sink.upsert-materialize" : "AUTO" }, "dynamicTableSink" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`print`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "uuid", "dataType" : "VARCHAR(20)" }, { "name" : "name", "dataType" : "VARCHAR(10)" }, { "name" : "age", "dataType" : "INT" }, { "name" : "ts", "dataType" : "INT" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "print" } } } }, "inputChangelogMode" : [ "INSERT" ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>", "description" : "Sink(table=[default_catalog.default_database.print], fields=[uuid, name, age, ts])" } ], "edges" : [ { "source" : 1, "target" : 2, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] }
- Submit the FlinkServer SQL job again.
Return to the FlinkServer SQL development page, enter the following SQL statement, and submit:
EXECUTE PLAN 'hdfs://hacluster/tmp/plan.json';
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot