更新时间:2024-12-11 GMT+08:00

FlinkSQL算子并行度

本章节适用于MRS 3.5.0及以后版本。

使用场景

通过CompiledPlan提交的作业,算子的并行度、算子的TTL都以CompiledPlan中的值为准,而不是“flink-conf.yaml”中的值。FlinkSQL支持通过修改作业的CompiledPlan来设置算子并行度。

修改CompiledPlan时不能破坏Json File文件结构,否则作业会提交失败。CompiledPlan的保存路径可以是HDFS路径也可以是OBS路径,本示例以HDFS路径为例。

使用方法

修改CompiledPlan中对应算子的“table.exec.resource.default-parallelism”值,即可修改算子的并行度。

示例

  1. 开发FlinkServer SQL作业。

    可参考如何创建FlinkServer作业在FlinkServer的SQL开发界面,开发如下SQL并单击“语义校验”:
    set parallelism.default = 2;
    
    CREATE TABLE print(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT
    ) WITH (
      'connector' = 'print'
    );
    CREATE TABLE datagen(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts INT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1'
    );
    COMPILE PLAN 'hdfs://hacluster/tmp/plan.json' FOR insert into
      print
    select
      *
    from
      datagen;

  2. 查看并修改CompiledPlan文件中算子并行度。

    1. 登录FusionInsight Manager,选择“集群 > 服务 > HDFS”,单击“NameNode Web UI”后的链接,选择“Utilities > Browse the file system”,查看并获取“hdfs://hacluster/tmp/plan.json”文件(单击“语义校验”后系统会在HDFS中会生成CompiledPlan文件),文件内容如下,显示“id”为“1”的datagen算子并行度为“2”。
    2. 以修改datagen算子并行度为“1”为例,则修改"table.exec.resource.default-parallelism" : "1"并保存,然后重新上传至“hdfs://hacluster/tmp/plan.json”。
    {
      "flinkVersion" : "1.17",
      "nodes" : [ {
        "id" : 1,
        "type" : "stream-exec-table-source-scan_1",
        "configuration" : {
          "table.exec.resource.default-parallelism" : "2"
        },
        "scanTableSource" : {
          "table" : {
            "identifier" : "`default_catalog`.`default_database`.`datagen`",
            "resolvedTable" : {
              "schema" : {
                "columns" : [ {
                  "name" : "uuid",
                  "dataType" : "VARCHAR(20)"
                }, {
                  "name" : "name",
                  "dataType" : "VARCHAR(10)"
                }, {
                  "name" : "age",
                  "dataType" : "INT"
                }, {
                  "name" : "ts",
                  "dataType" : "INT"
                } ],
                "watermarkSpecs" : [ ]
              },
              "partitionKeys" : [ ],
              "options" : {
                "connector" : "datagen",
                "rows-per-second" : "1"
              }
            }
          }
        },
        "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>",
        "description" : "TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[uuid, name, age, ts])",
        "inputProperties" : [ ]
      }, {
        "id" : 2,
        "type" : "stream-exec-sink_1",
        "configuration" : {
          "table.exec.resource.default-parallelism" : "2",
          "table.exec.sink.keyed-shuffle" : "AUTO",
          "table.exec.sink.not-null-enforcer" : "ERROR",
          "table.exec.sink.rowtime-inserter" : "ENABLED",
          "table.exec.sink.type-length-enforcer" : "IGNORE",
          "table.exec.sink.upsert-materialize" : "AUTO"
        },
        "dynamicTableSink" : {
          "table" : {
            "identifier" : "`default_catalog`.`default_database`.`print`",
            "resolvedTable" : {
              "schema" : {
                "columns" : [ {
                  "name" : "uuid",
                  "dataType" : "VARCHAR(20)"
                }, {
                  "name" : "name",
                  "dataType" : "VARCHAR(10)"
                }, {
                  "name" : "age",
                  "dataType" : "INT"
                }, {
                  "name" : "ts",
                  "dataType" : "INT"
                } ],
                "watermarkSpecs" : [ ]
              },
              "partitionKeys" : [ ],
              "options" : {
                "connector" : "print"
              }
            }
          }
        },
        "inputChangelogMode" : [ "INSERT" ],
        "inputProperties" : [ {
          "requiredDistribution" : {
            "type" : "UNKNOWN"
          },
          "damBehavior" : "PIPELINED",
          "priority" : 0
        } ],
        "outputType" : "ROW<`uuid` VARCHAR(20), `name` VARCHAR(10), `age` INT, `ts` INT>",
        "description" : "Sink(table=[default_catalog.default_database.print], fields=[uuid, name, age, ts])"
      } ],
      "edges" : [ {
        "source" : 1,
        "target" : 2,
        "shuffle" : {
          "type" : "FORWARD"
        },
        "shuffleMode" : "PIPELINED"
      } ]
    }

  3. 重新提交FlinkServer SQL作业。

    返回FlinkServer的SQL开发界面,开发如下SQL并提交。
    EXECUTE PLAN 'hdfs://hacluster/tmp/plan.json';