DataGen
功能描述
DataGen主要用于生成随机数据,可用于调试以及测试等场景。
类别 |
详情 |
---|---|
支持表类型 |
源表 |
注意事项
- 创建DataGen表时,表字段类型不支持Array,Map和Row复杂类型,可以通过CREATE TABLE语句中的“COMPUTED COLUMN”来进行类似功能构造。
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
语法格式
create table dataGenSource( attr_name attr_type (',' attr_name attr_type)* (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'datagen' );
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
参数说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
指定要使用的连接器,这里是'datagen'。 |
rows-per-second |
否 |
10000 |
Long |
每秒生成的行数,用以控制数据发出速率。 |
number-of-rows |
否 |
无 |
Long |
生成数据的总行数。默认条件下,不限制生成数据的总行数。如果有字段生成器类型为序列生成器,则当生成数据的行数达到上限或者序列数字达到结束值时,都不会再生成数据。 |
fields.#.kind |
否 |
random |
String |
指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下:
|
fields.#.min |
否 |
'#'号指定的字段类型的最小值 |
'#'号指定的字段类型 |
当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 |
fields.#.max |
否 |
'#'号指定的字段类型的最大值 |
'#'号指定的字段类型 |
当“fields.#.kind”字段为:random时有效。 随机生成数的最大值,'#' 指定的字段仅适用于数字类型。 |
fields.#.max-past |
否 |
0 |
Duration |
当“fields.#.kind”字段为:random时有效。 随机生成器生成相对当前时间向过去偏移的最大值,'#' 指定的字段仅适用于时间戳类型。 |
fields.#.length |
否 |
100 |
Integer |
当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,'#' 指定的字段仅适用于char、varchar、string。 |
fields.#.start |
否 |
无 |
'#'号指定的字段类型 |
当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 |
fields.#.end |
否 |
无 |
'#'号指定的字段类型 |
当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。 |
示例
创建flink opensource sql作业,运行如下作业脚本,通过DataGen表产生随机数据并输出到Print结果表中。
create table dataGenSource( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器 'fields.user_id.length' = '3', --限制字段user_id长度为3 'fields.amount.kind' = 'sequence', --为字段amount指定sequence生成器 'fields.amount.start' = '1', --字段amount的起始值 'fields.amount.end' = '1000' --字段amount的结束值 ); create table printSink( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink select * from dataGenSource;
该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。
- 方法一:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
- 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
- 方法二:如果在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载文件名包含taskmanager.out的文件查看结果日志。