DIS源表
功能描述
创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。
数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
语法格式
create table disSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format-type' = '' );
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 |
connector.region |
是 |
数据所在的DIS区域。 |
connector.ak |
否 |
访问密钥ID(Access Key ID),需与sk同时设置 |
connector.sk |
否 |
Secret Access Key,需与ak同时设置 |
connector.channel |
是 |
数据所在的DIS通道名称。 |
connector.partition-count |
否 |
读取从0分区开始计算的partition-count个通道范围内的数据。 该参数和partition-range参数不能同时配置。 当两个参数都没有配置的时候默认读取所有partition。 |
connector.partition-range |
否 |
指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。 partition-range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。 |
connector.offset |
否 |
用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。 |
connector.start-time |
否 |
DIS数据读取从该起始时间的数据。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start-time也没配置offset的时候,读取最新数据。 |
connector. enable-checkpoint |
否 |
是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。 |
connector. checkpoint-app-name |
否 |
DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。 |
connector. checkpoint-interval |
否 |
DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second 勿与offset或start-time同时设置。 |
format.type |
是 |
数据编码格式,可选为“csv”、“json” |
format.field-delimiter |
否 |
属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 |
注意事项
无
示例
1 2 3 4 5 6 7 8 9 10 11 12 |
create table disCsvSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT) with ( 'connector.type' = 'dis', 'connector.region' = 'ap-southeast-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); |