开发实时处理单任务DLI Flink SQL作业
前提条件
- 已新建作业。
- 当前用户已锁定该作业,否则需要通过“抢锁”锁定作业后才能继续开发作业。新建或导入作业后默认被当前用户锁定,详情参见“编辑锁定功能”。
开发SQL脚本
- 参考访问DataArts Studio实例控制台,登录DataArts Studio管理控制台。
- 在DataArts Studio控制台首页,选择对应工作空间的“数据开发”模块,进入数据开发页面。
- 在数据开发主界面的左侧导航栏,选择。
- 在作业目录中,双击实时处理单任务作业名称,进入作业开发页面。如果没有作业,请先创建一个实时处理单任务的作业,引擎选择DLI,进入作业开发页面。
- 在SQL编辑器右侧,单击“基本信息”,可以配置作业的基本信息、作业标签、作业描述、属性参数信息等。单任务SQL作业的基本信息如表1所示,属性如表2所示,高级信息如表3所示。
表1 作业基本信息 参数
说明
作业名
DLI Flink SQL作业的名称。
作业ID
DLI Flink SQL作业的ID。
责任人
自动匹配创建作业时配置的作业责任人,此处支持修改。
责任人只能从当前工作空间的用户中选择。
作业委托
当“作业调度身份是否可配置”设置为“是”,该参数可见。
配置委托后,作业执行过程中,以委托的身份与其他服务交互。
作业标签
配置作业的标签,用以分类管理作业。
单击“新增”,可给作业重新添加一个标签。
作业描述
作业的描述信息。
配置DLI Flink SQL作业属性参数信息,参见表2。
表2 实时处理单任务DLI Flink SQL作业属性参数信息 属性
说明
基础配置
DLI队列
选择DLI队列名称。可以单击查看按钮查看DLI队列信息。
Flink版本
系统默认显示DLI队列版本信息。
仅支持DLI队列1.15以后的Flink版本。
UDF Jar
选择DLI的资源信息,支持多选。非必填项。
如果没有可选资源,请前往管理资源进行配置。在选择UDF Jar之前,您需要将UDF Jar包上传至OBS桶中,并在“资源管理”页面中新建资源。系统允许本地直接上传Jar文件。
Flink SQL中可调用插入Jar包中的自定义函数。作业级UDF配置优先级高于全局UDF配置。
DLI作业委托
选择DLI作业委托。
如果没有合适的委托,请前往IAM创建。
资源配置
Job Manager CPU
配置作业的CPU数量。输入值必须在0.5到16之间。
Job Manager Memory
单位为GiB时,输入值必须在2到64之间。
单位为MiB时,输入值必须在2,048到65,536之间。
Task Manager CPU
配置任务的CPU数量。输入值必须在0.5到16之间。
Task Manager Memory
单位为GiB时,输入值必须在2到64之间。
单位为MiB时,输入值必须在2,048到65,536之间。
并发数
配置作业的并发数。并发数不能小于1。
并发数是指同时运行DLI Flink SQL作业的任务数。用户为作业选择的并发量。
每个TaskManager的slot数
设置每个TaskManager的slot数。输入值不能小于1。
运行参数配置
是否开启作业自动快照功能
该参数开启,系统会自动生成checkpoint。
该参数开启时,需要配置如下参数。
- 系统检查点间隔 ,定时执行 Checkpoint 的时间间隔,默认值为180s。
- checkpoint保留最大数量,flink参数state.checkpoints.num-retained checkpoint的保留最大数量,默认值为5。
- 系统检查点超时时间,默认值为10分钟。到达默认之后系统检查点将生成失败。
- 两次系统检查点间最短间隔,两次系统检查点之间的最短时间间隔,如果Checkpoint最大并行度是1,那么这个配置确保两个Checkpoint之间有一个最短时间间隔。默认值为180s。
该参数关闭时,无需配置上面的参数。
State数据过期时间
设定的Flink状态生存时间,状态服务每经过设置的时间就会清理当前存储的状态(仅支持DLI Flink SQL作业)。
输入值不能小于1。默认36小时。
Flink重启策略
检测Failure Rate的时间间隔
仅当“Flink重启策略”选择“Failure Rate”时,才需配置。
输入值不能小于1。单位支持:秒、毫秒、分、小时。默认为秒。
时间间隔内的最大失败次数
仅当“Flink重启策略”选择“Failure Rate”时,才需配置。
作业失败之前,给定时间间隔内的最大重新启动次数。
输入值不能小于1。
尝试重启的次数
仅当“Flink重启策略”选择“Fixed Delay”时,才需配置。
Flink在宣告作业失败之前重试执行的次数。
输入值不能小于1。
每次重启时间间隔
延迟重试意味着在执行失败后,重新执行不会立即开始,而只会在一定的延迟后开始当程序与外部系统进行交互(例如,连接或挂起的事务在尝试重新执行之前应达到超时)时,延迟重试可能会有所帮助。
输入值不能小于1。单位支持:秒、毫秒、分、小时。默认为秒。
运行参数配置
输入运行参数。Flink作业运行时自定义参数。
输入格式为key=value的参数。多个参数使用Enter键分隔。
举例如下:
taskmanager.numbertaskslot=1
execution.checkpointing.mode=AT_LEAST_ONCE
state.checkpoints.num-retained=10
日志配置
日志归档
开关开启时,可在作业监控查看Flink作业的提交、运行日志。
日志存储路径
日志存储的OBS桶路径。
- checkpoint存储在OBS桶/{Workspace id}/作业名/checkpoint/时间戳/UUID格式命名的OBS路径。
- DataArts日志默认存储在以dlf-log-{Project id}命名的OBS桶,支持基于工作区全局配置OBS桶。
- DLI日志路径默认存储在OBS桶/jobs/logs/DLI任务ID_UUID/时间戳。
根日志级别
- TRACE
- DEBUG
- INFO
- WARN
- ERROR
类日志等级
单击
添加日志级别,设置类日志等级。输入“Logger name”,选择“Logger level”。
Logger level支持如下五种。
- TRACE
- DEBUG
- INFO
- WARN
- ERROR
表3 高级参数 参数
是否必选
说明
作业状态轮询时间(秒)
是
设置轮询时间(30~60秒、120秒、180秒、240秒、300秒),每隔x秒查询一次作业是否执行完成。
作业运行过程中,根据设置的作业状态轮询时间查询作业运行状态。
最长等待时间
是
设置作业执行的超时时间,如果作业配置了重试,在超时时间内未执行完成,该作业将会再次重试。
说明:如果作业一直处于启动中状态,没有成功开始运行,超时后作业会被置为失败。
重启策略
是
- 从上一个检查点重跑
- 重新重跑
失败重试
是
作业执行失败后,是否重新执行作业。
- 是:重新执行作业,请配置以下参数。
- 超时重试
- 最大重试次数
- 重试间隔时间(秒)
- 否:默认值,不重新执行作业。 说明:
如果作业节点配置了重试,并且配置了超时时间,该节点执行超时后,系统支持再重试。
当节点运行超时导致的失败不会重试时,您可前往“默认项设置”修改此策略。
当“失败重试”配置为“是”才显示“超时重试”。
- 在SQL编辑器中输入SQL语句,支持输入多条SQL语句。 为了方便脚本开发,数据开发模块提供了如下能力:
- 脚本编辑器支持使用如下快捷键,以提升脚本开发效率。
- F8:运行
- F9:停止
- Ctrl + /:注释或解除注释光标所在行或代码块
- Ctrl + Z:撤销
- Ctrl + F:查找
- Ctrl + Shift + R:替换
- Ctrl + X:剪切
- Ctrl + S:保存
- Alt + 鼠标拖动:列模式编辑,修改一整块内容
- Ctrl + 鼠标点选:多列模式编辑,多行缩进
- Ctrl + →或Ctrl + ←:向右或向左按单词移动光标
- Ctrl + Home或Ctrl + End:移至当前文件的最前或最后
- Home或End:移至当前行最前或最后
- Ctrl + Shift + L:鼠标双击相同的字符串后,为所有相同的字符串添加光标,实现批量修改
- Ctrl + D:删除一行
- Shift + Ctrl + U:解锁
- Ctrl + Alt + K:同词选择
- Ctrl + B:格式化
- Ctrl + Shift + Z:重做
- Ctrl + Enter:执行所选行/选中内容
- Ctrl + Alt + F:标记
- Ctrl + Shift + K:查找上一个
- Ctrl + K:查找下一个
- Ctrl + Backspace:删除左侧单词
- Ctrl + Delete:删除右侧单词
- Alt + Backspace:删除至行首
- Alt + Delete:删除至行尾
- Alt + Shift-Left:选择行首
- Alt + Shift-Right:选择行尾
- 支持脚本参数。
在SQL语句中直接写入脚本参数,然后在编辑器右侧的“参数”处选择“更新脚本参数”。也可以直接配置该作业脚本的参数与常量。
脚本示例如下,其中str1是参数名称,只支持英文字母、数字、“-”、“_”、“<”和“>”,最大长度为16字符,且参数名称不允许重名。
select ${str1} from data;
- 脚本编辑器支持使用如下快捷键,以提升脚本开发效率。
- (可选)在编辑器上方,单击“格式化”,格式化SQL语句。
- 在编辑器上方,单击“保存”,保存该作业并进行提交。
配置作业参数
作业参数为全局参数,可用于作业中的任意节点。操作方法如下:
单击编辑器右侧的“参数”,展开配置页面,配置如表4所示的参数。
| 功能 | 说明 |
|---|---|
| 变量 | |
| 新增 | 单击“新增”,在文本框中填写作业参数的名称和参数值。 参数配置完成后,在作业中的引用格式为:${参数名称} |
| 编辑参数表达式 | 在参数值文本框后方,单击 |
| 修改 | 在参数名和参数值的文本框中直接修改。 |
| 掩码显示 | 在参数值为密钥等情况下,从安全角度,请单击 |
| 删除 | 在参数值文本框后方,单击 |
| 常量 | |
| 新增 | 单击“新增”,在文本框中填写作业常量的名称和参数值。 参数配置完成后,在作业中的引用格式为:${参数名称} |
| 编辑参数表达式 | 在参数值文本框后方,单击 |
| 修改 | 在参数名和参数值的文本框中直接修改,修改完成后,请保存。 |
| 删除 | 在参数值文本框后方,单击 |
| 工作空间环境变量 | |
| 查看工作空间已配置的变量和常量。 | |
单击“作业参数预览”页签,展开预览页面,配置如表5所示的参数。
| 功能 | 说明 |
|---|---|
| 当前时间 | 仅单次调度才显示。系统默认为当前时间。 |
| 事件触发时间 | 仅事件驱动调度才显示。系统默认为事件触发时间。 |
| 周期调度 | 仅周期调度才显示。系统默认为调度周期。 |
| 具体时间 | 仅周期调度才显示。周期调度配置的具体运行时间。 |
| 起始日期 | 仅周期调度才显示。周期调度的生效时间。 |
| 后N个实例 | 作业运行调度的实例个数。 |
在作业参数预览中,如果作业参数配置存在语法异常情况系统会给出提示信息。
如果参数配置了依赖作业实际运行时产生的数据,参数预览功能中无法模拟此类数据,则该数据不展示。
运行作业
作业配置完成后,请执行以下操作。
- 单击画布上方的“启动”,运行作业。
DLI Flink支持手动触发savepoint。DLI Flink作业触发savepoint,需要开发人员手动去触发savepoint,开发人员修改作业后从savepoint启动作业,从而避免数据丢失。(savepoint是指实时作业某一个时间点的快照,Flink实时作业停止时,系统会自动保存一个savepoint快照,修改作业后,继续从这个停止的时间点开始运行。)
作业启动包含无状态启动和有状态启动。无状态启动是指不包含任何初始状态启动。有状态启动是指从最新状态或者指定状态启动。有状态启动包含如下两种方式:
- 从最新状态启动
- 从指定状态启动:选择checkpoint/savepoint进行启动。
- 从状态集选择:支持从状态集选择checkpoint/savepoint
- 从OBS选择:支持从OBS选择或者填写checkpoint/savepoint
- 查看作业执行结果。
执行结果最多显示1000条数据;执行结果的大小不超过3MB,若超过3MB结果会被截断。
DLI Flink SQL脚本执行结果支持一键清空筛选功能(仅支持执行结果以列表形式展示)。
- 在实时作业监控页面,查看运行日志,包含运行日志、提交日志、Flink运行日志。其中,在Flink运行日志中可以查看Job Manager和Taskmanager的日志。
模板
在开发Flink SQL单任务实时处理作业时,系统支持可以引用脚本模板。创建模板的详细操作请参见配置模板,脚本模板的使用场景指导请参见引用脚本模板和参数模板的使用介绍。




