DIS Source Stream
Overview
Create a source stream to read data from DIS. DIS accesses user data and CS reads data from the DIS stream as input data for jobs. This cloud ecosystem is applicable to scenarios where data out of cloud services is imported into cloud services for filtering, real-time analysis, monitoring and reporting, and dumping.
DIS addresses the challenge of transmitting data outside cloud services to cloud services. DIS builds data intake streams for custom applications capable of processing or analyzing streaming data. DIS continuously captures, transmits, and stores terabytes of data from hundreds of thousands of sources every hour, such as logs, Internet of Things (IoT) data, social media feeds, website clickstreams, and location-tracking events. For more information about DIS, see the Data Ingestion Service User Guide.
Syntax
Syntax
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )WITH (type = "dis",region = "",channel = "",partition_count = "",encode = "",field_delimiter = "",offset= "")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
Description
| Parameter | Mandatory | Description |
|---|---|---|
| type | Yes | Data source type. Value dis indicates that the data source is DIS. |
| region | Yes | Region where DIS for storing the data is located. |
| channel | Yes | Name of the DIS stream where data is located. |
| partition_count | No | Number of partitions of the DIS stream where data is located. This parameter and partition_range cannot be configured at the same time. If this parameter is not specified, data of all partitions is read by default. |
| partition_range | No | Range of partitions of a DIS stream, data in which is ingested by the CS job. This parameter and partition_count cannot be configured at the same time. If this parameter is not specified, data of all partitions is read by default. If this parameter is set to [2:5], the CS job will ingest data in partitions 2 and 5. |
| encode | Yes | Data encoding format. Available options include csv, json, xml, email, and blob. NOTE:
|
| field_delimiter | No | Attribute delimiter. You can set this parameter only when encode is set to csv. The default value is a comma (). |
| quote | No | Quoted symbol in a data format. The attribute delimiters between two quoted symbols are treated as common characters.
NOTE: After this parameter is specified, ensure that each field does not contain quoted symbols or contains an even number of quoted symbols. Otherwise, parsing will fail. |
| json_config | No | If encode is set to json, you need to set this parameter to specify the mapping between the JSON field and the stream definition field. An example of the format is as follows: field1=data_json.field1; field2=data_json.field2. |
| xml_config | No | If encode is set to xml, you need to set this parameter to specify the mapping between the xml field and the stream definition field. An example of the format is as follows: field1=data_xml.field1; field2=data_xml.field2. |
| email_key | No | If encode is set to email, you need to set the parameter to specify the information to be extracted. You need to list the key values that correspond to stream definition fields. Multiple key values are separated by commas (,), for example, "Message-ID, Date, Subject, body". There is no keyword in the email body and CS specifies "body" as the keyword. |
| offset | No |
|
| start_time | No | Start time for reading DIS data.
|
| timeindicator | No | Timestamp added in the source stream. The value can be processing time or event time. NOTE:
|
| enable_checkpoint | No | Whether to enable the checkpoint function. Value true indicates to enable the checkpoint function, and value false indicates to disable the checkpoint function. The default value is false. |
| checkpoint_app_name | No | ID of a DIS consumer. If a DIS stream is consumed by different jobs, you need to configure the consumer ID for each job to avoid checkpoint confusion. |
| checkpoint_interval | No | Interval of checkpoint operations on the DIS source operator. The value is in the unit of seconds. The default value is 60. |
Precautions
The attribute type used as the timestamp must be long or timestamp.
Example
- In CSV encoding format, CS reads data from the DIS stream and records it as codes in CSV format. The codes are separated by commas (,).
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH ( type = "dis", region = "cn-north-1" , channel = "csinput", encode = "csv", field_delimiter = "," )TIMESTAMP BY car_timestamp.rowtime;
- In JSON encoding format, CS reads data from the DIS stream and records it as codes in JSON format. For example, {"car":{"car_id":"ZJA710XC", "car_owner":"coco", "car_age":5, "average_speed":80, "total_miles":15000, "car_timestamp":1526438880}}
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH ( type = "dis", region = "cn-north-1" , channel = "csinput", encode = "json", json_config = "car_id=car.car_id;car_owner =car.car_owner;car_age=car.car_age;average_speed =car.average_speed ;total_miles=car.total_miles;" )TIMESTAMP BY car_timestamp.rowtime;
- In XML encoding format, CS reads data from the DIS stream and records it as codes in XML format.
CREATE SOURCE STREAM person_infos ( pid BIGINT, pname STRING, page int, plocation STRING, pbir DATE, phealthy BOOLEAN, pgrade ARRAY[STRING] ) WITH ( type = "dis", region = "cn-north-1" , channel = "dis-cs-input", encode = "xml", field_delimiter = ",", xml_config = "pid=person.pid;page=person.page;pname=person.pname;plocation=person.plocation;pbir=person.pbir;pgrade=person.pgrade;phealthy=person.phealthy" );An example of XML data is as follows:<?xml version="1.0" encodeing="utf-8"?> <root> <person> <pid>362305199010025042</pid> <pname>xiaoming</pname> <page>28</page> <plocation>Shangdu County, Jining District, Ulanchap, Inner Mongolia</plocation> <pbir>1990-10-02</pbir> <phealthy>true</phealthy> <pgrade>[A,B,C]</pgrade> </person> </root> - In EMAIL encoding format, CS reads data from the DIS stream and records it as a complete Email.
CREATE SOURCE STREAM email_infos ( Event_ID String, Event_Time Date, Subject String, From_Email String, To_EMAIL String, CC_EMAIL Array[String], BCC_EMAIL String, MessageBody String, Mime_Version String, Content_Type String, charset String, Content_Transfer_Encoding String ) WITH ( type = "dis", region = "cn-north-1" , channel = "csinput", encode = "email", email_key = "Message-ID, Date, Subject, From, To, CC, BCC, Body, Mime-Version, Content-Type, charset, Content_Transfer_Encoding" );
An example of email data is as follows:
Message-ID: <200906291839032504254@sample.com> Date: Fri, 11 May 2001 09:54:00 -0700 (PDT) From: zhangsan@sample.com To: lisi@sample.com, wangwu@sample.com Subject: "Hello World" Cc: lilei@sample.com, hanmei@sample.com Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Bcc: jack@sample.com, lily@sample.com X-From: Zhang San X-To: Li Si, Wang Wu X-cc: Li Lei, Han Mei X-bcc: X-Folder: \Li_Si_June2001\Notes Folders\Notes inbox X-Origin: Lucy X-FileName: sample.nsf Dear Associate / Analyst Committee: Hello World! Thank you, Associate / Analyst Program zhangsan
Last Article: Cloud Ecosystem: DIS
Next Article: DIS Sink Stream
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.