文档首页/ 数据湖探索 DLI/ API参考/ Flink作业相关API/ 生成Flink SQL作业的静态流图
更新时间:2024-05-13 GMT+08:00
分享

生成Flink SQL作业的静态流图

功能介绍

该API用于生成Flink SQL作业的静态流图。

Flink 1.15版本不支持生成静态流图。

调试

您可以在API Explorer中调试该接口。

URI

  • URI格式

    POST /v3/{project_id}/streaming/jobs/{job_id}/gen-graph

  • 参数说明
    表1 URI参数说明

    参数名称

    是否必选

    参数类型

    说明

    project_id

    String

    项目编号,用于资源隔离。获取方式请参考获取项目ID

请求消息

表2 请求参数说明

参数名称

是否必选

参数类型

说明

sql_body

String

SQL。

cu_number

Integer

是在作业编辑页面配置的作业占用资源总CU数,需配置与实际占用资源一致,作业实际占用资源根据算子并行数按需申请。

cu_number = 管理单元 + (算子总并行数 / 单TM Slot数) * 单TM所占CU数

manager_cu_number

Integer

管理单元CU数。

parallel_number

Integer

最大并行度。

并行数为作业每个算子的并行数,适度增加并行数会提高作业整体算力,但也须考虑线程增多带来的切换开销,上限是计算单元CU数的4倍,最佳实践为计算单元CU数的1-2倍。

tm_cus

Integer

单个taskManagerCU数量。

tm_slot_num

Integer

单个taskManager Slot数量。

operator_config

String

算子的配置。

可先行调用该接口获取算子ID,即响应消息中stream_graph包含的operator_list中的id即为算子ID。

static_estimator

Boolean

是否静态资源预估。

配置为true时,即根据算子ID和流量预估作业消耗资源。

static_estimator_config

String

每个算子的流量/命中率配置,json格式的字符串。

当static_estimator为true时需要配置该参数,配置时传入算子ID和算子流量配置。

  • 可先行调用该接口获取算子ID,即响应消息中stream_graph包含的operator_list中的id即为算子ID。
  • 算子流量根据用户业务实际情况预估。

job_type

String

作业类型。

只支持flink_opensource_sql_job类型作业。

graph_type

String

流图类型。当前支持以下两种流图类型。

  • 简化流图:simple_graph
  • 静态流图:job_graph

flink_version

String

Flink版本。当前只支持1.10和1.12。

响应消息

表3 响应参数说明

参数名称

是否必选

参数类型

说明

is_success

Boolean

执行请求是否成功。“true”表示请求执行成功。

message

String

系统提示信息,执行成功时,信息可能为空。

error_code

String

错误码。

stream_graph

String

静态流图的描述信息。

请求示例

生成Flink SQL作业的静态流图,流图的类型为静态流图。

{
  "job_type": "flink_opensource_sql_job",
  "graph_type": "job_graph",
  "sql_body": "create table orders(\r\n  name string,\r\n  num int\r\n) with (\r\n  'connector' = 'datagen',\r\n  'rows-per-second' = '1', --每秒生成一条数据\r\n  'fields.name.kind' = 'random', --为字段user_id指定random生成器\r\n  'fields.name.length' = '5' --限制user_id长度为3\r\n);\r\n \r\nCREATE TABLE sink_table (\r\n  name string,\r\n  num int\r\n) WITH (\r\n   'connector' = 'print'\r\n);\r\nINSERT into sink_table SELECT * FROM orders;",
  "cu_number": 2,
  "manager_cu_number": 1,
  "parallel_number": 2,
  "tm_cus": 1,
  "tm_slot_num": 0,
  "operator_config": "",
  "static_estimator": true,
  "flink_version": "1.12",
  "static_estimator_config": "{\"operator_list\":[{\"id\":\"0a448493b4782967b150582570326227\",\"output_rate\":1000},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"output_rate\":1000}]}"
}

响应示例

{
    "message": "",
    "is_success": true,
    "error_code": "",
    "stream_graph": "{\n  \"jid\" : \"44334c4259f6714bddef1ac525364052\",\n  \"name\" : \"InternalJob_1715392878428\",\n  \"nodes\" : [ {\n    \"id\" : \"0a448493b4782967b150582570326227\",\n    \"parallelism\" : 1,\n    \"operator\" : \"\",\n    \"operator_strategy\" : \"\",\n    \"description\" : \"Sink: Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])\",\n    \"chain_operators_id\" : [ \"0a448493b4782967b150582570326227\" ],\n    \"inputs\" : [ {\n      \"num\" : 0,\n      \"id\" : \"bc764cd8ddf7a0cff126f51c16239658\",\n      \"ship_strategy\" : \"FORWARD\",\n      \"exchange\" : \"pipelined_bounded\"\n    } ],\n    \"optimizer_properties\" : {}\n  }, {\n    \"id\" : \"bc764cd8ddf7a0cff126f51c16239658\",\n    \"parallelism\" : 2,\n    \"operator\" : \"\",\n    \"operator_strategy\" : \"\",\n    \"description\" : \"Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])\",\n    \"chain_operators_id\" : [ \"bc764cd8ddf7a0cff126f51c16239658\" ],\n    \"optimizer_properties\" : {}\n  } ],\n  \"operator_list\" : [ {\n    \"id\" : \"0a448493b4782967b150582570326227\",\n    \"name\" : \"Sink: Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])\",\n    \"type\" : \"Sink\",\n    \"contents\" : \"Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])\",\n    \"parallelism\" : 1,\n    \"tags\" : \"[SINK]\",\n    \"input_operators_id\" : [ \"bc764cd8ddf7a0cff126f51c16239658\" ]\n  }, {\n    \"id\" : \"bc764cd8ddf7a0cff126f51c16239658\",\n    \"name\" : \"Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])\",\n    \"type\" : \"Source\",\n    \"contents\" : \"TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])\",\n    \"parallelism\" : 2,\n    \"tags\" : \"[PROCESS, UDF]\",\n    \"input_operators_id\" : [ ]\n  } ]\n}"
}

为了便于查看返回体信息,我们将stream_graph格式化后如下所示:

    "jid": "65b6a7b0c1ad95b1722a92b49d2f6eba",
    "name": "InternalJob_1715392245413",
    "nodes": [
        {
            "id": "0a448493b4782967b150582570326227",
            "parallelism": 1,
            "operator": "",
            "operator_strategy": "",
            "description": "Sink: Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])",
            "chain_operators_id": [
                "0a448493b4782967b150582570326227"
            ],
            "inputs": [
                {
                    "num": 0,
                    "id": "bc764cd8ddf7a0cff126f51c16239658",
                    "ship_strategy": "FORWARD",
                    "exchange": "pipelined_bounded"
                }
            ],
            "optimizer_properties": {

            }
        },
        {
            "id": "bc764cd8ddf7a0cff126f51c16239658",
            "parallelism": 2,
            "operator": "",
            "operator_strategy": "",
            "description": "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])",
            "chain_operators_id": [
                "bc764cd8ddf7a0cff126f51c16239658"
            ],
            "optimizer_properties": {

            }
        }
    ],
    "operator_list": [
        {
            "id": "0a448493b4782967b150582570326227",
            "name": "Sink: Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])",
            "type": "Sink",
            "contents": "Sink(table=[default_catalog.default_database.sink_table], fields=[name, num])",
            "parallelism": 1,
            "tags": "[SINK]",
            "input_operators_id": [
                "bc764cd8ddf7a0cff126f51c16239658"
            ]
        },
        {
            "id": "bc764cd8ddf7a0cff126f51c16239658",
            "name": "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])",
            "type": "Source",
            "contents": "TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[name, num])",
            "parallelism": 2,
            "tags": "[PROCESS, UDF]",
            "input_operators_id": [

            ]
        }
    ]
}

状态码

状态码如表4所示。

表4 状态码

状态码

描述

200

操作成功。

400

输入参数无效。

错误码

调用接口出错后,将不会返回上述结果,而是返回错误码和错误信息,更多介绍请参见错误码

相关文档