更新时间:2025-09-01 GMT+08:00
分享

创建Flink OpenSource SQL作业

本章节介绍如何新建Flink OpenSource SQL作业。

DLI Flink OpenSource SQL类型作业完全兼容社区Flink版本,并在社区connector基础之上,新增了Redis、DWS(GaussDB)数据源类型。社区Flink SQL DDL/DML/函数等语法说明及限制可参考Table API & SQL

前提条件

  • 创建Flink OpenSource SQL作业时,需要事先准备数据源以及数据输出通道。
  • 创建Flink OpenSource SQL作业,访问其他外部数据源时,需要先创建跨源连接,打通作业运行队列到外部数据源之间的网络。

注意事项

创建作业提交任务前,建议先开通云审计服务,用于记录与DLI服务相关的操作事件,便于日后的查询、审计和回溯。云审计服务支持的DLI操作列表详见使用CTS审计DLI服务

关于如何开通云审计服务以及如何查看追踪事件,请参考《云审计服务快速入门》。

创建Flink OpenSource SQL作业

  1. 在DLI管理控制台的左侧导航栏中,单击作业管理>Flink作业,进入“Flink作业”页面。
  2. “Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。

    图1 创建Flink OpenSource SQL作业

  3. 配置作业信息。

    表1 作业配置信息

    参数

    参数说明

    类型

    选择“Flink OpenSource SQL”:用户通过编辑SQL语句来启动作业。

    名称

    作业名称,只能由字母、中文、数字、中划线和下划线组成,并且长度为1~57字节。

    作业名称必须是唯一的。

    描述

    作业的相关描述,长度为0~512字节。

    模板名称

    用户可以选择样例模板或自定义的作业模板。关于模板的详细信息,请参见管理Flink作业模板

    标签

    使用标签标识云资源。包括标签键和标签值。如果您需要使用同一标签标识多种云资源,即所有服务均可在标签输入框下拉选择同一标签,建议在标签管理服务(TMS)中创建预定义标签。

    如您的组织已经设定DLI的相关标签策略,则需按照标签策略规则为资源添加标签。标签如果不符合标签策略的规则,则可能会导致资源创建失败,请联系组织管理员了解标签策略详情。

    具体请参考《标签管理服务用户指南》。

    说明:
    • 最多支持20个标签。
    • 一个“键”只能添加一个“值”。
    • 每个资源中的键名不能重复。
    • 标签键:在输入框中输入标签键名称。
      说明:

      标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : +-@ ,但首尾不能含有空格,不能以_sys_开头。

    • 标签值:在输入框中输入标签值。
      说明:

      标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : +-@ 。

  4. 单击“确定”,进入作业编辑页面。
  5. 编辑OpenSource SQL作业。

    在SQL语句编辑区域,输入详细的SQL语句。相关SQL语句请参考《数据湖探索Flink OpenSource SQL语法参考》。

  6. 单击“语义校验”,确保语义校验成功。

    • 只有语义校验成功后,才可以执行“启动”作业的操作。
    • 如果校验成功,提示“SQL语义校验成功”。
    • 如果校验失败,会在错误的SQL语句前面显示红色的“X”记号,鼠标移动到“X”号上可查看详细错误,请根据错误提示修改SQL语句。

    Flink 1.15不支持语法校验功能。

  7. 设置作业运行参数。

    表2 作业运行参数说明

    参数

    参数说明

    所属队列

    队列用于指定Flink作业执行的资源队列。

    队列决定了作业在弹性资源池中运行时能够使用的计算资源。每个队列都分配了指定的资源,即队列的CU,队列的CU配置直接影响作业的性能和执行效率。

    在提交作业前,评估作业的资源需求,选择一个能够满足需求的队列。

    Flink OpenSource SQL作业即支持选择通用类型的队列。

    Flink版本

    Flink版本是选择作业运行时所使用的Flink的版本。不同版本的Flink支持不同的特性。

    了解更多Flink版本的信息请参考DLI Flink版本说明

    选择使用Flink1.15版本时请在作业中配置允许DLI访问的云服务的委托信息。

    Flink 1.15版本语法请参考Flink OpenSource SQL1.15版本使用说明Flink OpenSource SQL1.15语法

    Flink 1.12版本语法请参考Flink OpenSource SQL1.12语法

    说明:

    不建议长期混用不同版本的Flink引擎。

    • 长期混用不同版本的Flink引擎会导致代码在新旧版本之间不兼容,影响作业的执行效率。
    • 当作业依赖于特定版本的库或组件,长期混用不同版本的Flink引擎可能会导致作业因依赖冲突而执行失败。

    UDF Jar

    用户自定义UDF文件,在后续作业中可以调用插入Jar包中的自定义函数。

    UDF Jar包的管理方式:

    • 上传OBS管理程序包:提前将对应的jar包上传至OBS桶中。并在此处选择对应的OBS路径。
    • 上传DLI管理程序包:提前将对应的jar包上传至OBS桶中,并在DLI管理控制台的数据管理>程序包管理中创建程序包,具体操作请参考创建DLI程序包

    Flink1.15及以上版本在创建作业时仅支持配置OBS中的程序包,不支持读取DLI程序包。

    委托

    使用Flink 1.15及以上版本执行作业时,按需配置自定义委托用于授予DLI访问其他服务的操作权限。

    自定义委托请参考创建DLI自定义委托权限

    资源配置版本

    根据不同的Flink引擎版本,DLI提供了不同资源配置模板。

    V2版本对比于V1模板不支持设置CU数量,支持直接设置Job Manager Memory和Task Manager Memory。

    V1:适用于Flink 1.12、Flink 1.13、Flink 1.15

    V2:适用于Flink 1.13、Flink 1.15、Flink 1.17

    优先推荐使用V2版本的参数设置。

    V1版本的具体参数说明请参考表3

    V2版本的具体参数说明请参考表4

    表3 资源规格参数-V1

    参数

    参数说明

    CU数量

    CU数量为DLI的计算单元数量和管理单元数量总和,CU也是DLI的计费单位,1CU=1核4G。

    当前配置的CU数量为运行作业时所需的CU数,不能超过其绑定队列的CU数量。

    说明:

    当开启TaskManager配置时,为了优化弹性资源池队列的管理,在您设置“单TM Slot”后,为您自动调整CU数量与实际CU数量一致。

    CU数量=实际CU数量=max[管理单元和TaskManager的CPU总和,(管理单元和TaskManager的内存总和/4)]

    • 管理单元和TaskManager的CPU总和=实际TM数 * 单TM所占CU数 + 管理单元。
    • 管理单元和TaskManager的内存总和= 实际TM数 * 设置的单个TM的内存 + 管理单元内存
    • 如果配置了单TM Slot数,实际TM数 = 并行数 / 单 TM Slot数。
    • 如果没配置了单TM Slot数 ,实际TM数 = (CU数量 - 管理单元)/单TM所占CU数。
    • 如果没在优化参数配置单个TM的内存和管理单元内存,默认单个TM的内存 = 单TM所占CU数 * 4。管理单元内存 = 管理单元 * 4。
    • Spark资源并行度由Executor数量和Executor CPU核数共同决定。

    管理单元

    管理单元CU数量。

    并行数

    作业的并行数是指作业中各个算子的并行执行的子任务的数量,即算子的子任务数就是其对应算子的并行度。

    说明:

    最大并行数不能大于计算单元(CU数量-管理单元)的4倍。

    TaskManager配置

    用于设置TaskManager资源参数。

    • 勾选后需配置下列参数:
      • “单TM所占CU数”:每个TaskManager占用的资源数量。
      • “单TM Slot”:每个TaskManager包含的Slot数量。
    • 不勾选该参数,,系统自动按照默认值为您配置。
      • “单TM所占CU数”:默认值为1。
      • “单TM Slot”:默认值为“(并行数 * 单TM所占CU数 )/(CU数量 - 管理单元)”。

    OBS桶

    选择OBS桶用于保存用户作业日志信息、checkpoint等信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”

    保存作业日志

    设置是否将作业运行时的日志信息保存到OBS。日志信息的保存路径为:“桶名/jobs/logs/作业id开头的目录”。

    注意:

    该参数建议勾选,否则作业运行完成后不会生成运行日志,后续如果作业运行异常则无法获取运行日志进行定位。

    勾选后需配置下列参数:

    “OBS桶”:选择OBS桶用于保存用户作业日志信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”
    说明:

    如果同时勾选了“开启Checkpoint”“保存作业日志”,OBS授权一次即可。

    作业异常告警

    设置是否将作业异常告警信息,如作业出现运行异常或者欠费情况,以SMN的方式通知用户。

    勾选后需配置下列参数:

    “SMN主题”

    选择一个自定义的SMN主题。如何自定义SMN主题,请参见《消息通知服务用户指南》“创建主题”章节。

    开启Checkpoint

    设置是否开启作业快照,开启后可基于Checkpoint(一致性检查点)恢复作业。

    勾选后需配置下列参数:
    • “Checkpoint间隔”:Checkpoint的时间间隔,单位为秒,输入范围 1~999999,默认值为30s。
    • “Checkpoint模式”:支持如下两种模式:
      • At least once:事件至少被处理一次。
      • Exactly once:事件仅被处理一次。
    • “OBS桶”:选择OBS桶用于保存用户Checkpoint。如果选择的OBS桶是未授权状态,需要单击“OBS授权”
      Checkpoint保存路径为:“桶名/jobs/checkpoint/作业id开头的目录”。
      说明:

      如果同时勾选了“开启Checkpoint”“保存作业日志”,OBS授权一次即可。

    异常自动重启

    设置是否启动异常自动重启功能,当作业异常时将自动重启并恢复作业。

    勾选后需配置下列参数:

    • “异常重试最大次数”:配置异常重试最大次数。单位为“次/小时”。
      • 无限:无限次重试。
      • 有限:自定义重试次数。
    • “从Checkpoint恢复”:需要同时勾选“开启Checkpoint”才可配置该参数。

    空闲状态保留时长

    用于清除GroupBy、RegularJoin、Rank、Depulicate等算子经过最大保留时间后仍未更新的中间状态,默认设置为1小时。

    脏数据策略

    选择处理脏数据的策略。支持如下三种策略:“忽略”“抛出异常”“保存”

    “脏数据策略”选择“保存”时,配置“脏数据转储地址”。单击地址框选择保存脏数据的OBS路径。

    仅DIS数据源支持配置脏数据策略。

    表4 资源规格参数-V2

    参数

    参数说明

    并行数

    作业的并行数是指作业中各个算子的并行执行的子任务的数量,算子的子任务数就是其对应算子的并行度。

    说明:
    • 最小并行数不能小于1。默认值为1。
    • 最大并行数不能大于计算单元(CU数量-管理单元)的4倍。

    Job Manager CPU

    该参数用于设置JobManager可以使用的CPU核数。

    Job Manager CPU默认值为1。最小值不能小于0.5。

    Job Manager Memory

    该参数指用于设置JobManager可以使用的内存。

    Job Manager Memory默认值为4GB。最小值不能小于2GB(不能小于2048M)。 单位默认GB,可设置为GB,MB。

    Task Manager CPU

    该参数指用于设置TaskManager可以使用的CPU核数。

    Task Manager CPU默认值为1。最小值不能小于0.5。

    Task Manager Memory

    该参数指用于设置TaskManager可以使用的内存。

    Task Manager Memory默认值4GB。最小值不能小于2GB(不能小于2048M)。 单位默认GB,可设置为GB,MB。

    单TM Slot

    该参数用于设置单个TaskManager可以提供的并行任务数量。每个Task Slot可以并行执行一个任务。增加 Task Slots 可以提高 TaskManager 的并行处理能力,但也会增加资源消耗。

    Task Slots的数量与TaskManager的CPU数相关联,因为每个CPU可以提供一个Task Slot。

    单TM Slot默认值为1。最小并行数不能小于1。

    OBS桶

    选择OBS桶用于保存用户作业日志信息、checkpoint等信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”

    保存作业日志

    设置是否将作业运行时的日志信息保存到OBS。日志信息的保存路径为:“桶名/jobs/logs/作业id开头的目录”。

    注意:

    该参数建议勾选,否则作业运行完成后不会生成运行日志,后续如果作业运行异常则无法获取运行日志进行定位。

    勾选后需配置下列参数:

    “OBS桶”:选择OBS桶用于保存用户作业日志信息。如果选择的OBS桶是未授权状态,需要单击“OBS授权”
    说明:

    如果同时勾选了“开启Checkpoint”“保存作业日志”,OBS授权一次即可。

    作业异常告警

    设置是否将作业异常告警信息,如作业出现运行异常或者欠费情况,以SMN的方式通知用户。

    勾选后需配置下列参数:

    “SMN主题”

    选择一个自定义的SMN主题。如何自定义SMN主题,请参见《消息通知服务用户指南》“创建主题”章节。

    开启Checkpoint

    设置是否开启作业快照,开启后可基于Checkpoint(一致性检查点)恢复作业。

    勾选后需配置下列参数:
    • “Checkpoint间隔”:Checkpoint的时间间隔,单位为秒,输入范围 1~999999,默认值为30s。
    • “Checkpoint模式”:支持如下两种模式:
      • At least once:事件至少被处理一次。
      • Exactly once:事件仅被处理一次。
    • “OBS桶”:选择OBS桶用于保存用户Checkpoint。如果选择的OBS桶是未授权状态,需要单击“OBS授权”
      Checkpoint保存路径为:“桶名/jobs/checkpoint/作业id开头的目录”。
      说明:

      如果同时勾选了“开启Checkpoint”“保存作业日志”,OBS授权一次即可。

    异常自动重启

    设置是否启动异常自动重启功能,当作业异常时将自动重启并恢复作业。

    勾选后需配置下列参数:

    • “异常重试最大次数”:配置异常重试最大次数。单位为“次/小时”。
      • 无限:无限次重试。
      • 有限:自定义重试次数。
    • “从Checkpoint恢复”:需要同时勾选“开启Checkpoint”才可配置该参数。

    空闲状态保留时长

    用于清除GroupBy、RegularJoin、Rank、Depulicate等算子经过最大保留时间后仍未更新的中间状态,默认设置为1小时。

    脏数据策略

    选择处理脏数据的策略。支持如下三种策略:“忽略”“抛出异常”“保存”

    “脏数据策略”选择“保存”时,配置“脏数据转储地址”。单击地址框选择保存脏数据的OBS路径。

    仅DIS数据源支持配置脏数据策略。

  8. (可选)根据需要设置自定义配置。相关参数详情可以参考Flink作业调优

    图2 自定义配置

    Flink作业支持在Flink作业的自定义配置中设置计算资源规格参数, 且自定义配置中的参数值优先级高于指定的值。

    参数对应关系请参考表5

    Flink1.12优先推荐参考控制台的配置方法来配置计算资源规格参数,使用自定义配置参数可能会出现实际CU统计数据不一致的问题。

    表5 控制台计算资源规格参数与Flink自定义配置中参数的对应关系

    自定义配置

    计算资源规格参数-V1

    计算资源规格参数-V2

    说明

    kubernetes.jobmanager.cpu

    管理单元

    Job Manager CPU

    该参数用于设置JobManager可以使用的CPU核数。

    Job Manager CPU默认值为1。最小值不能小于0.5。

    kubernetes.taskmanager.cpu

    单TM所占CU

    Task Manager CPU

    该参数指用于设置TaskManager可以使用的CPU核数。

    Task Manager CPU默认值为1。最小值不能小于0.5。

    jobmanager.memory.process.size

    -

    Job Manager Memory

    该参数指用于设置JobManager可以使用的内存。

    Job Manager Memory默认值为4G。最小值不能小于2G(不能小于2048M)。 单位默认GB,可设置为GB,MB。

    taskmanager.memory.process.size

    -

    Task Manager Memory

    该参数指用于设置TaskManager可以使用的内存。

    Task Manager Memory默认值4G。最小值不能小于2G(不能小于2048M)。 单位默认GB,可设置为GB,MB。

  9. 单击“保存”,保存作业和相关参数。
  10. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立即启动”,启动作业。

    启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业列表中,在状态列中可以查看作业状态。作业提交成功后,状态将由提交中变为运行中。运行完成后显示“已完成”。

    如果作业状态为提交失败运行异常,表示作业提交或运行失败。用户可以在作业列表中的状态列中,将鼠标移动到状态图标上查看错误信息,单击可以复制错误信息。根据错误信息解决故障后,重新提交。

    其他功能按钮说明如下:

    • 另存为:将新建作业另存为一个新作业。
    • 静态流图:提供静态资源预估功能和流图展示功能。如图4所示。
    • 简化流图: 展示source到sink的数据处理流程。如图3所示。
    • 格式化:对SQL语句进行格式化。
    • 设为模板:将新创建的作业设置为作业模板。
    • 主题设置:设置页面主题,可以设置字体大小,自动换行和页面风格。
    • 帮助:跳转至帮助中心,为用户提供SQL语法参考。

简化流图

在OpenSource SQL作业编辑页面,单击“简化流图”按钮即可展示。

仅Flink 1.12和Flink 1.10版本支持查看简化流图。

图3 简化流图

静态流图

在OpenSource SQL作业编辑页面,单击“静态流图”按钮即可展示。

  • 仅Flink 1.12和Flink 1.10版本支持查看简化流图。
  • Flink Opensource SQL作业中使用自定义函数时,不支持生成静态流图。

“静态流图”页面还支持以下功能:

  • 支持资源预估。通过单击“静态流图”页面中的“资源预估”按钮,可进行资源预估。单击“恢复初始值”按钮,可在资源预估后恢复初始值。
  • 支持展示页面缩放。
  • 支持根据算子链展开/合并。
  • 支持编辑“并行数”,“流量”和“命中率”。
    • 并行数:一个任务的并发数。
    • 流量:算子的数据流量,单位:条/s。
    • 命中率:数据经过算子处理之后的保留率。命中率=算子的数据流出量/流入量,单位:%。
图4 静态流图

相关文档