DIS Source Stream

Function

Create a source stream to read data from DIS. DIS accesses user data and Flink job reads data from the DIS stream as input data for jobs. Flink jobs can quickly remove data from producers using DIS source sources for continuous processing. Flink jobs are applicable to scenarios where data outside the cloud service is imported to the cloud service for filtering, real-time analysis, monitoring reports, and dumping.

DIS addresses the challenge of transmitting data outside cloud services to cloud services. DIS builds data intake streams for custom applications capable of processing or analyzing streaming data. DIS continuously captures, transmits, and stores terabytes of data from hundreds of thousands of sources every hour, such as logs, Internet of Things (IoT) data, social media feeds, website clickstreams, and location-tracking events. For more information about DIS, see the Data Ingestion Service User Guide.

Syntax

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dis",
    region = "",
    channel = "",
    partition_count = "",
    encode = "",
    field_delimiter = "",
    offset= "")
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Keyword

Table 1 Keyword description

Parameter

Mandatory

Description

type

Yes

Data source type. Value dis indicates that the data source is DIS.

region

Yes

Region where DIS for storing the data is located.

ak

No

Access Key ID (AK). For details about how to obtain the access key, see My Credentials.

sk

No

Specifies the secret access key used together with the ID of the access key. For details about how to obtain the access key, see My Credentials.

channel

Yes

Name of the DIS stream where data is located.

partition_count

No

Number of partitions of the DIS stream where data is located. This parameter and partition_range cannot be configured at the same time. If this parameter is not specified, data of all partitions is read by default.

partition_range

No

Range of partitions of a DIS stream, data in which is ingested by the DLI job. This parameter and partition_count cannot be configured at the same time. If this parameter is not specified, data of all partitions is read by default.

If this parameter is set to [2:5], the DLI job will ingest data in partitions 2 and 5.

encode

Yes

Data encoding format. Available options include csv, json, xml, email, blob, and user_defined.

  • field_delimiter must be specified if this parameter is set to csv.
  • json_config must be specified if this parameter is set to json.
  • xml_config must be specified if this parameter is set to xml.
  • email_key must be specified if this parameter is set to email.
  • If this parameter is set to blob, the received data is not parsed, only one stream attribute exists, and the data format is ARRAY[TINYINT].
  • encode_class_name and encode_class_parameter must be specified if this parameter is set to user_defined.

field_delimiter

No

Attribute delimiter. You can set this parameter only when encode is set to csv. The default value is a comma (,).

quote

No

Quoted symbol in a data format. The attribute delimiters between two quoted symbols are treated as common characters.

  • If double quotation marks are used as the quoted symbol, set this parameter to "\u005c\u0022" for character conversion.
  • If a single quotation mark is used as the quoted symbol, set this parameter to a comma (,).
NOTE:

After this parameter is specified, ensure that each field does not contain quoted symbols or contains an even number of quoted symbols. Otherwise, parsing will fail.

json_config

No

When the encoding format is JSON, you need to use this parameter to specify the mapping between JSON fields and stream definition fields. The format is field1=data_json.field1; field2=data_json.field2; field3=$, where field3=$ indicates that the content of field3 is the entire JSON string.

xml_config

No

If encode is set to xml, you need to set this parameter to specify the mapping between the xml field and the stream definition field. An example of the format is as follows: field1=data_xml.field1; field2=data_xml.field2.

email_key

No

If encode is set to email, you need to set the parameter to specify the information to be extracted. You need to list the key values that correspond to stream definition fields. Multiple key values are separated by commas (,), for example, "Message-ID, Date, Subject, body". There is no keyword in the email body and DLI specifies "body" as the keyword.

encode_class_name

No

If encode is set to user_defined, you need to set this parameter to the name of the user-defined decoding class (including the complete package path). The class must inherit the DeserializationSchema class.

encode_class_parameter

No

If encode is set to user_defined, you can set this parameter to specify the input parameter of the user-defined decoding class. Only one parameter of the string type is supported.

offset

No

  • If data is imported to the DIS stream after the job is started, this parameter will become invalid.
  • If the job is started after data is imported to the DIS stream, you can set the parameter as required.

    For example, if offset is set to 100, DLI starts from the 100th data record in DIS.

start_time

No

Start time for reading DIS data.

  • If this parameter is specified, DLI reads data read from the specified time. The parameter value is in the format of yyyy-MM-dd HH:mm:ss.
  • If neither start_time nor offset is specified, DLI reads the latest data.
  • If start_time is not specified but offset is specified, DLI reads data from the data record specified by offset.

timeindicator

No

Timestamp added in the source stream. The value can be processing time or event time.

NOTE:
  • If this parameter is set to processing time, the format is proctime.proctime. In this case, an attribute proctime will be added to the original attribute field. If there are three attributes in the original attribute field, four attributes will be exported after this parameter is set to processing time. However, the attribute length remains unchanged if the rowtime attribute is specified.
  • If this parameter is set to event time, you can select an attribute in the stream as the timestamp. The format is attr_name.rowtime.
  • This parameter can be simultaneously set to processing time and event time.

enable_checkpoint

No

Whether to enable the checkpoint function. Value true indicates to enable the checkpoint function, and value false indicates to disable the checkpoint function. The default value is false.

checkpoint_app_name

No

ID of a DIS consumer. If a DIS stream is consumed by different jobs, you need to configure the consumer ID for each job to avoid checkpoint confusion.

checkpoint_interval

No

Interval of checkpoint operations on the DIS source operator. The value is in the unit of seconds. The default value is 60.

Precautions

The attribute type used as the timestamp must be long or timestamp.

Example

  • In CSV encoding format, DLI reads data from the DIS stream and records it as codes in CSV format. The codes are separated by commas (,).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_age INT,
      average_speed INT,
      total_miles INT,
      car_timestamp LONG
    )
      WITH (
        type = "dis",
        region = "cn-north-1" ,
        channel = "dliinput",
        encode = "csv",
        field_delimiter = ","
    )
      TIMESTAMP BY car_timestamp.rowtime;
    
  • In JSON encoding format, DLI reads data from the DIS stream and records it as codes in JSON format. For example, {"car":{"car_id":"ZJA710XC", "car_owner":"coco", "car_age":5, "average_speed":80, "total_miles":15000, "car_timestamp":1526438880}}
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_age INT,
      average_speed INT,
      total_miles INT,
      car_timestamp LONG
    )
      WITH (
        type = "dis",
        region = "cn-north-1" ,
        channel = "dliinput",
        encode = "json",
        json_config = "car_id=car.car_id;car_owner =car.car_owner;car_age=car.car_age;average_speed =car.average_speed ;total_miles=car.total_miles;"
    )
      TIMESTAMP BY car_timestamp.rowtime;
    
  • In XML encoding format, DLI reads data from the DIS stream and records it as codes in XML format.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    CREATE SOURCE STREAM person_infos (
        pid BIGINT,
        pname STRING,
        page int,
        plocation STRING,
        pbir DATE,
        phealthy BOOLEAN,
        pgrade ARRAY[STRING]
    )
      WITH (
        type = "dis",
        region = "cn-north-1" ,
        channel = "dis-dli-input",
        encode = "xml",
        field_delimiter = ",",
        xml_config = "pid=person.pid;page=person.page;pname=person.pname;plocation=person.plocation;pbir=person.pbir;pgrade=person.pgrade;phealthy=person.phealthy"
    );
    
    An example of XML data is as follows:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    <?xml version="1.0" encodeing="utf-8"?>
    
    <root>
      <person>
        <pid>362305199010025042</pid>
        <pname>xiaoming</pname>
        <page>28</page>
        <plocation>Shangdu County, Jining District, Ulanchap, Inner Mongolia</plocation>
        <pbir>1990-10-02</pbir>
        <phealthy>true</phealthy>
        <pgrade>[A,B,C]</pgrade>
      </person>
    </root>
    
  • In EMAIL encoding format, DLI reads data from the DIS stream and records it as a complete Email.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    CREATE SOURCE STREAM email_infos (  
      Event_ID String,  
      Event_Time Date,  
      Subject String,  
      From_Email String,  
      To_EMAIL String,  
      CC_EMAIL Array[String],  
      BCC_EMAIL String,  
      MessageBody String,  
      Mime_Version String,  
      Content_Type String,  
      charset String,  
      Content_Transfer_Encoding String
    )
      WITH (  
        type = "dis",  
        region = "cn-north-1" ,
        channel = "dliinput",  
        encode = "email",  
        email_key = "Message-ID, Date, Subject, From, To, CC, BCC, Body, Mime-Version, Content-Type, charset, Content_Transfer_Encoding"  
    );
    

    An example of email data is as follows:

    Message-ID: <200906291839032504254@sample.com>
    Date: Fri, 11 May 2001 09:54:00 -0700 (PDT)
    From: zhangsan@sample.com
    To: lisi@sample.com, wangwu@sample.com 
    Subject:  "Hello World" 
    Cc: lilei@sample.com, hanmei@sample.com
    Mime-Version: 1.0
    Content-Type: text/plain; charset=us-ascii
    Content-Transfer-Encoding: 7bit
    Bcc: jack@sample.com, lily@sample.com
    X-From: Zhang San
    X-To: Li Si, Wang Wu
    X-cc: Li Lei, Han Mei
    X-bcc: 
    X-Folder: \Li_Si_June2001\Notes Folders\Notes inbox
    X-Origin: Lucy
    X-FileName: sample.nsf
    
    Dear Associate / Analyst Committee:
    
    Hello World! 
    
    Thank you,
    
    Associate / Analyst Program 
    zhangsan