DIS输入流
功能描述
创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。
数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
语法格式
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_count = "", encode = "", field_delimiter = "", offset= "");
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“dis”表示数据源为数据接入服务。 |
region |
是 |
数据所在的DIS区域。 |
ak |
否 |
访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 |
sk |
否 |
Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 |
channel |
是 |
数据所在的DIS通道名称。 |
partition_count |
否 |
数据所在的DIS通道分区数。该参数和partition_range参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 |
partition_range |
否 |
指定作业从DIS通道读取的分区范围。该参数和partition_count参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3。 |
encode |
是 |
数据编码格式,可选为“csv”、“json”、“xml”、“email”、“blob”和“user_defined”。
|
field_delimiter |
否 |
属性分隔符,仅当编码格式为csv时该参数需要填写,例如配置为“,”。 |
quote |
否 |
可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。
说明:
|
json_config |
否 |
当编码格式为json时,用户需要通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2; field3=$”,其中field3=$表示field3的内容为整个json串。 |
xml_config |
否 |
当编码格式为xml时,用户需要通过该参数来指定xml字段和流定义字段的映射关系,格式为“field1=data_xml.field1; field2=data_xml.field2”。 |
email_key |
否 |
当编码格式为email时,用户需要通过该参数来指定需要提取的信息,需要列出信息的key值,需要与流定义字段一一对应,多个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,DLI规定其关键字为“body”。 |
encode_class_name |
否 |
当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 |
encode_class_parameter |
否 |
当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 |
offset |
否 |
|
start_time |
否 |
DIS数据读取起始时间。
|
enable_checkpoint |
否 |
是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 |
checkpoint_app_name |
否 |
DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 |
checkpoint_interval |
否 |
DIS源算子做checkpoint的时间间隔,单位秒,默认为60。 |
注意事项
在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型。
示例
- CSV编码格式:从DIS通道读取数据,记录为csv编码,并且以逗号为分隔符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH ( type = "dis", region = "xxx", channel = "dliinput", encode = "csv", field_delimiter = "," );
- JSON编码格式:从DIS通道读取数据,记录为json编码。数据示例:{"car":{"car_id":"ZJA710XC", "car_owner":"coco", "car_age":5, "average_speed":80, "total_miles":15000, "car_timestamp":1526438880}}。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH ( type = "dis", region = "xxx", channel = "dliinput", encode = "json", json_config = "car_id=car.car_id;car_owner =car.car_owner;car_age=car.car_age;average_speed =car.average_speed ;total_miles=car.total_miles;" );
- XML编码格式:从DIS通道读取数据,记录为xml编码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
CREATE SOURCE STREAM person_infos ( pid BIGINT, pname STRING, page int, plocation STRING, pbir DATE, phealthy BOOLEAN, pgrade ARRAY[STRING] ) WITH ( type = "dis", region = "xxx", channel = "dis-dli-input", encode = "xml", field_delimiter = ",", xml_config = "pid=person.pid;page=person.page;pname=person.pname;plocation=person.plocation;pbir=person.pbir;pgrade=person.pgrade;phealthy=person.phealthy" );
xml数据示例如下:1 2 3 4 5 6 7 8 9 10 11 12 13
<?xml version="1.0" encoding="utf-8"?> <root> <person> <pid>362305199010025042</pid> <pname>xiaoming</pname> <page>28</page> <plocation>xxx</plocation> <pbir>1990-10-02</pbir> <phealthy>true</phealthy> <pgrade>[A,B,C]</pgrade> </person> </root>
- EMAIL编码格式:从DIS通道读取数据,每条记录为一封完整邮件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM email_infos ( Event_ID String, Event_Time Date, Subject String, From_Email String, To_EMAIL String, CC_EMAIL Array[String], BCC_EMAIL String, MessageBody String, Mime_Version String, Content_Type String, charset String, Content_Transfer_Encoding String ) WITH ( type = "dis", region = "xxx", channel = "dliinput", encode = "email", email_key = "Message-ID, Date, Subject, From, To, CC, BCC, Body, Mime-Version, Content-Type, charset, Content_Transfer_Encoding" );
email数据示例如下:
Message-ID: <200906291839032504254@sample.com> Date: Fri, 11 May 2001 09:54:00 -0700 (PDT) From: user1@sample.com To: user2@sample.com, user3@sample.com Subject: "Hello World" Cc: user4@sample.com, user5@sample.com Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Bcc: user6@sample.com, user7@sample.com X-From: user1 X-To: user2, user3 X-cc: user4, user5 X-bcc: X-Folder: \user2_June2001\Notes Folders\Notes inbox X-Origin: user8 X-FileName: sample.nsf Dear Associate / Analyst Committee: Hello World! Thank you, Associate / Analyst Program user1