更新时间:2024-04-19 GMT+08:00

DataGen

功能描述

DataGen主要用于生成随机数据,可用于调试以及测试等场景。

表1 支持类别

类别

详情

支持表类型

源表

注意事项

  • 创建DataGen表时,表字段类型不支持Array,Map和Row复杂类型,可以通过CREATE TABLE语句中的“COMPUTED COLUMN”来进行类似功能构造。
  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据

语法格式

create table dataGenSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression)
)
with (
  'connector' = 'datagen'
);

参数说明

表2 参数说明

参数

是否必选

默认值

数据类型

参数说明

connector

String

指定要使用的连接器,这里是'datagen'。

rows-per-second

10000

Long

每秒生成的行数,用以控制数据发出速率。

number-of-rows

Long

生成数据的总行数。默认条件下,不限制生成数据的总行数。如果有字段生成器类型为序列生成器,则当生成数据的行数达到上限或者序列数字达到结束值时,都不会再生成数据。

fields.#.kind

random

String

指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。

参数值可以是 'sequence' 或 'random',具体含义如下:

  • random是默认值,表示无界的随机生成器。您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成数的最大和最小值。当指定的字段类型为char、varchar、string时,可以通过“fields.#.length”参数指定长度。当指定的字段类型为时间戳类型时,可以通过“fields.#.max-past”参数指定相对当前时间向过去偏移的最大值。
  • sequence表示有界的序列生成器。您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值,当序列数字达到结束值时,就不会再生成数据。

fields.#.min

'#'号指定的字段类型的最小值

'#'号指定的字段类型

当“fields.#.kind”字段为:random时有效。

表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。

fields.#.max

'#'号指定的字段类型的最大值

'#'号指定的字段类型

当“fields.#.kind”字段为:random时有效。

随机生成数的最大值,'#' 指定的字段仅适用于数字类型。

fields.#.max-past

0

Duration

当“fields.#.kind”字段为:random时有效。

随机生成器生成相对当前时间向过去偏移的最大值,'#' 指定的字段仅适用于时间戳类型。

fields.#.length

100

Integer

当“fields.#.kind”字段为:random时有效。

随机生成器生成字符的长度,'#' 指定的字段仅适用于char、varchar、string。

fields.#.start

'#'号指定的字段类型

当“fields.#.kind”字段为:sequence时有效。

序列生成器的起始值。

fields.#.end

'#'号指定的字段类型

当“fields.#.kind”字段为:sequence时有效。

序列生成器的结束值。

示例

创建flink opensource sql作业,运行如下作业脚本,通过DataGen表产生随机数据并输出到Print结果表中。

create table dataGenSource(
  user_id string,
  amount int
) with (
  'connector' = 'datagen',
  'rows-per-second' = '1', --每秒生成一条数据
  'fields.user_id.kind' = 'random', --为字段user_id指定random生成器
  'fields.user_id.length' = '3', --限制字段user_id长度为3
  'fields.amount.kind' = 'sequence', --为字段amount指定sequence生成器
  'fields.amount.start' = '1', --字段amount的起始值
  'fields.amount.end' = '1000' --字段amount的结束值
);

create table printSink(
  user_id string,
  amount int
) with (
  'connector' = 'print'
);

insert into printSink select * from dataGenSource;

该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。

  • 方法一:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
    3. 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
  • 方法二:如果在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载文件名包含taskmanager.out的文件查看结果日志。