DIS源表
功能描述
创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。
数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见。
语法格式
create table disSource (
attr_name attr_type
(',' attr_name attr_type)*
(','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
(',' watermark for rowtime_column_name as watermark-strategy_expression)
)
with (
'connector.type' = 'dis',
'connector.region' = '',
'connector.channel' = '',
'format-type' = ''
); 参数说明
参数 | 是否必选 | 说明 |
|---|---|---|
connector.type | 是 | 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 |
connector.region | 是 | 数据所在的DIS区域。 |
connector.ak | 否 | 访问密钥ID(Access Key ID),需与sk同时设置 |
connector.sk | 否 | Secret Access Key,需与ak同时设置 |
connector.channel | 是 | 数据所在的DIS通道名称。 |
connector.partition-count | 否 | 读取从0分区开始计算的partition-count个通道范围内的数据。 该参数和partition-range参数不能同时配置。 当两个参数都没有配置的时候默认读取所有partition。 |
connector.partition-range | 否 | 指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。 partition-range ="[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。 |
connector.offset | 否 | 用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。 |
connector.start-time | 否 | DIS数据读取从该起始时间的数据。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start-time也没配置offset的时候,读取最新数据。 |
connector. enable-checkpoint | 否 | 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。 |
connector. checkpoint-app-name | 否 | DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。 |
connector. checkpoint-interval | 否 | DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second 勿与offset或start-time同时设置。 |
format.type | 是 | 数据编码格式,可选为“csv”、“json” |
format.field-delimiter | 否 | 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 |
注意事项
无
示例
1 2 3 4 5 6 7 8 9 10 11 12 | create table disCsvSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); |

