FlinkSQL算子并行度
本章节适用于MRS 3.5.0及以后版本。
使用场景
通过CompiledPlan提交的作业,算子的并行度、算子的TTL都以CompiledPlan中的值为准,而不是“flink-conf.yaml”中的值。FlinkSQL支持通过修改作业的CompiledPlan来设置算子并行度。
修改CompiledPlan时不能破坏Json File文件结构,否则作业会提交失败。CompiledPlan的保存路径可以是HDFS路径也可以是OBS路径,本示例以HDFS路径为例。
使用方法
修改CompiledPlan中对应算子的“table.exec.resource.default-parallelism”值,即可修改算子的并行度。
示例
- 开发FlinkServer SQL作业。
可参考如何创建FlinkServer作业在FlinkServer的SQL开发界面,开发如下SQL并单击“语义校验”:
set parallelism.default = 2; CREATE TABLE print( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT ) WITH ( 'connector' = 'print' ); CREATE TABLE datagen( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); COMPILE PLAN 'hdfs://hacluster/tmp/plan.json' FOR insert into print select * from datagen;
- 查看并修改CompiledPlan文件中算子并行度。
- 登录FusionInsight Manager,选择“集群 > 服务 > HDFS”,单击“NameNode Web UI”后的链接,选择“Utilities > Browse the file system”,查看并获取“hdfs://hacluster/tmp/plan.json”文件(单击“语义校验”后系统会在HDFS中会生成CompiledPlan文件),文件内容如下,显示“id”为“1”的datagen算子并行度为“2”。
- 以修改datagen算子并行度为“1”为例,则修改"table.exec.resource.default-parallelism" : "1"并保存,然后重新上传至“hdfs://hacluster/tmp/plan.json”。
{ "flinkVersion" : "1.17", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", "configuration" : { "table.exec.resource.default-parallelism" : "2" }, "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`datagen`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "uuid", "dataType" : "VARCHAR(20)" }, { "name" : "name", "dataType" : "VARCHAR(10)" }, { "name" : "age", "dataType" : "INT" }, { "name" : "ts", "dataType" : "INT" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "datagen", "rows-per-second" : "1" } } } }, "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>", "description" : "TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[uuid, name, age, ts])", "inputProperties" : [ ] }, { "id" : 2, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.resource.default-parallelism" : "2", "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", "table.exec.sink.rowtime-inserter" : "ENABLED", "table.exec.sink.type-length-enforcer" : "IGNORE", "table.exec.sink.upsert-materialize" : "AUTO" }, "dynamicTableSink" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`print`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "uuid", "dataType" : "VARCHAR(20)" }, { "name" : "name", "dataType" : "VARCHAR(10)" }, { "name" : "age", "dataType" : "INT" }, { "name" : "ts", "dataType" : "INT" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "print" } } } }, "inputChangelogMode" : [ "INSERT" ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>", "description" : "Sink(table=[default_catalog.default_database.print], fields=[uuid, name, age, ts])" } ], "edges" : [ { "source" : 1, "target" : 2, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] }
- 重新提交FlinkServer SQL作业。
返回FlinkServer的SQL开发界面,开发如下SQL并提交。
EXECUTE PLAN 'hdfs://hacluster/tmp/plan.json';