更新时间:2024-03-12 GMT+08:00

DIS源表

功能描述

创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。

数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。

语法格式

create table disSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
)
with (
  'connector.type' = 'dis',
  'connector.region' = '',
  'connector.channel' = '',
  'format-type' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

数据源类型,“dis”表示数据源为数据接入服务,必须为dis。

connector.region

数据所在的DIS区域。

connector.ak

访问密钥ID(Access Key ID),需与sk同时设置

connector.sk

Secret Access Key,需与ak同时设置

connector.channel

数据所在的DIS通道名称。

connector.partition-count

读取从0分区开始计算的partition-count个通道范围内的数据。

该参数和partition-range参数不能同时配置。

当两个参数都没有配置的时候默认读取所有partition。

connector.partition-range

指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。

partition-range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。

connector.offset

用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。

connector.start-time

DIS数据读取从该起始时间的数据。

当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。

当没有配置start-time也没配置offset的时候,读取最新数据。

connector. enable-checkpoint

是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。

勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。

connector. checkpoint-app-name

DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。

勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。

connector. checkpoint-interval

DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second

勿与offset或start-time同时设置。

format.type

数据编码格式,可选为“csv”、“json”

format.field-delimiter

属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。

注意事项

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
create table disCsvSource (
  car_id STRING,
  car_owner STRING,
  car_age INT,
  average_speed INT,
  total_miles INT)
with (
  'connector.type' = 'dis',
  'connector.region' = 'ap-southeast-1',
  'connector.channel' = 'disInput',
  'format.type' = 'csv'
);