更新时间:2024-09-20 GMT+08:00
分享

DataGen源表

功能描述

DataGen主要用于生成随机数据,可用于调试以及测试等场景。

前提条件

注意事项

  • 创建DataGen表时,表字段类型不支持Array,Map和Row复杂类型,可以通过CREATE TABLE语句中的“COMPUTED COLUMN”来进行类似功能构造。
  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

语法格式

create table dataGenSource(
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression)
)
with (
  'connector' = 'datagen'
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

参数说明

connector

String

指定要使用的连接器,这里是'datagen'。

rows-per-second

10000

Long

每秒生成的行数,用以控制数据发出速率。

fields.#.kind

random

String

指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。

参数值可以是 'sequence' 或 'random',具体含义如下:

  • random是默认的生成器,您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成的最大和最小值。

    当指定的字段类型为char、varchar、string时,可以同时通过“fields.#.length”字段指定长度。random是无界的生成器。

  • sequence生成器,您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值。sequence是有界的生成器,当序列数字达到结束值,读取结束。

fields.#.min

'#'号指定的字段类型的最小值

'#'号指定的字段类型

当“fields.#.kind”字段为:random时有效。

表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。

fields.#.max

'#'号指定的字段类型的最大值

'#'号指定的字段类型

当“fields.#.kind”字段为:random时有效。

随机生成器的最大值,'#' 指定的字段仅适用于数字类型。

fields.#.length

100

Integer

当“fields.#.kind”字段为:random时有效。

随机生成器生成字符的长度,#' 指定的字段仅适用于char、varchar、string。

fields.#.start

'#'号指定的字段类型

当“fields.#.kind”字段为:sequence时有效。

序列生成器的起始值。

fields.#.end

'#'号指定的字段类型

当“fields.#.kind”字段为:sequence时有效。

序列生成器的结束值。

示例

参考创建Flink OpenSource作业,创建flink opensource sql作业,运行如下作业脚本,通过DataGen表产生随机数据并输出到Print结果表中。

注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

create table dataGenSource(
  user_id string,
  amount int
) with (
  'connector' = 'datagen',
  'rows-per-second' = '1', --每秒生成一条数据
  'fields.user_id.kind' = 'random', --为字段user_id指定random生成器
  'fields.user_id.length' = '3' --限制user_id长度为3
);

create table printSink(
  user_id string,
  amount int
) with (
  'connector' = 'print'
);

insert into printSink select * from dataGenSource;

该作业提交后,作业状态变成“运行中”,后续您可通过如下操作查看输出结果。

  • 方法一:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 在对应Flink作业所在行的“操作”列,选择“更多 > FlinkUI”。
    3. 在FlinkUI界面,选择“Task Managers”,单击对应的任务名称,选择“Stdout”查看作业运行日志。
  • 方法二:若在提交运行作业前“运行参数”选择了“保存作业日志”,可以通过如下操作查看。
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

相关文档