文档首页 > > 流生态开发指南> 云生态:数据接入服务(DIS)> DIS输入流

DIS输入流

分享
更新时间: 2019/08/27 GMT+08:00

概述

创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,CS从DIS的通道读取数据,作为作业的输入数据。CS可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。

数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》

语法

语法格式

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )WITH (type = "dis",region = "",channel = "",partition_count = "",encode = "",field_delimiter = "",offset= "")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

语法说明

表1 语法说明

参数

是否必选

说明

type

数据源类型,“dis”表示数据源为数据接入服务。

region

数据所在的DIS区域。

channel

数据所在的DIS通道名称。

partition_count

数据所在的DIS通道分区数。该参数和partition_range参数不能同时配置。当该参数没有配置的时候默认读取所有partition。

partition_range

指定作业从DIS通道读取的分区范围。该参数和partition_count参数不能同时配置。当该参数没有配置的时候默认读取所有partition。

partition_range = "[2:5]"时,表示读取的分区范围是2-5,包括分区2和分区5。

encode

数据编码格式,可选为“csv”“json”“xml”“email”“blob”“user_defined”

说明:
  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需配置“json_config”属性。
  • 若编码格式为“xml”,则需配置“xml_config”属性。
  • 若编码格式为“email”,则需配置“email_key”属性。
  • 若编码格式为“blob”,表示不对接收的数据进行解析,流属性仅能有一个且数据格式为ARRAY[TINYINT]。
  • 若编码格式为“user_defined”,则需配置“encode_class_name”“encode_class_parameter”属性。

field_delimiter

属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为

quote

可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。

  • 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
  • 当引用符号为单引号时,则设置quote = "'"。
说明:

设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。

json_config

当编码格式为json时,用户需要通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2; field3=$”,其中field3=$表示field3的内容为整个json串。

xml_config

当编码格式为xml时,用户需要通过该参数来指定xml字段和流定义字段的映射关系,格式为“field1=data_xml.field1; field2=data_xml.field2”。

email_key

当编码格式为email时,用户需要通过该参数来指定需要提取的信息,需要列出信息的key值,需要与流定义字段一一对应,多个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,CS规定其关键字为“body”。

encode_class_name

当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。

encode_class_parameter

当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。

offset

  • 当启动作业后再获取数据,则该参数无效。
  • 当获取数据后再启动作业,用户可以根据需求设置该参数的数值。

    例如当offset= "100"时,则表示CS服务从DIS服务中的第100条数据开始处理。

start_time

DIS数据读取起始时间。

  • 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。
  • 当没有配置start_time也没配置offset的时候,读取最新数据。
  • 当没有配置start_time但配置了offset的时候,则从offset开始读取数据。

timeindicator

在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。

说明:
  • 若设置“processing time”,则为proctime.proctime。当设置了proctime.proctime时,会在原有属性字段基础上多增加一个proctime系统时间戳属性,假设原有字段为3个,设置了proctime.proctime后会变成4个,设置rowtime属性字段不会发生变化。
  • 若设置“event time”,可选择流中的某个属性来作为时间戳,格式为attr_name.rowtime。
  • 以上两者可以同时设置。

enable_checkpoint

是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。

checkpoint_app_name

DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。

checkpoint_interval

DIS源算子做checkpoint的时间间隔,单位秒,默认为60。

注意事项

用来做时间戳的属性类型必须为long或者timestamp。

示例

  • CSV编码格式:从DIS通道读取数据,记录为csv编码,并且以逗号为分隔符。
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_age INT,
      average_speed INT,
      total_miles INT,
      car_timestamp LONG
    )
    WITH (
      type = "dis",
      region = "cn-north-1" ,
      channel = "csinput",
    
      encode = "csv",
      field_delimiter = ","
    )TIMESTAMP BY car_timestamp.rowtime;
  • JSON编码格式:从DIS通道读取数据,记录为json编码。数据示例:{"car":{"car_id":"ZJA710XC", "car_owner":"coco", "car_age":5, "average_speed":80, "total_miles":15000, "car_timestamp":1526438880}}。
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_age INT,
      average_speed INT,
      total_miles INT,
      car_timestamp LONG
    )
    WITH (
      type = "dis",
      region = "cn-north-1" ,
      channel = "csinput",
    
      encode = "json",
      json_config = "car_id=car.car_id;car_owner =car.car_owner;car_age=car.car_age;average_speed =car.average_speed ;total_miles=car.total_miles;"
    )TIMESTAMP BY car_timestamp.rowtime;
  • XML编码格式:从DIS通道读取数据,记录为xml编码。
    CREATE SOURCE STREAM person_infos (
        pid BIGINT,
        pname STRING,
        page int,
        plocation STRING,
        pbir DATE,
        phealthy BOOLEAN,
        pgrade ARRAY[STRING]
    )
    WITH (
      type = "dis",
      region = "cn-north-1" ,
      channel = "dis-cs-input",
      encode = "xml",
      field_delimiter = ",",
      xml_config = "pid=person.pid;page=person.page;pname=person.pname;plocation=person.plocation;pbir=person.pbir;pgrade=person.pgrade;phealthy=person.phealthy"
    );
    xml数据示例如下:
    <?xml version="1.0" encodeing="utf-8"?>
    
    <root>
      <person>
        <pid>362305199010025042</pid>
        <pname>xiaoming</pname>
        <page>28</page>
        <plocation>内蒙古,乌兰察布市,集宁区,商都县</plocation>
        <pbir>1990-10-02</pbir>
        <phealthy>true</phealthy>
        <pgrade>[A,B,C]</pgrade>
      </person>
    </root>
  • EMAIL编码格式:从DIS通道读取数据,每条记录为一封完整邮件。
    CREATE SOURCE STREAM email_infos (  
    Event_ID String,  
    Event_Time Date,  
    Subject String,  
    From_Email String,  
    To_EMAIL String,  
    CC_EMAIL Array[String],  
    BCC_EMAIL String,  
    MessageBody String,  
    Mime_Version String,  
    Content_Type String,  
    charset String,  
    Content_Transfer_Encoding String
    )
    WITH (  
    type = "dis",  
    region = "cn-north-1" ,
    channel = "csinput",  
    
    encode = "email",  
    email_key = "Message-ID, Date, Subject, From, To, CC, BCC, Body, Mime-Version, Content-Type, charset, Content_Transfer_Encoding"  
    );

    email数据示例如下:

    Message-ID: <200906291839032504254@sample.com>
    Date: Fri, 11 May 2001 09:54:00 -0700 (PDT)
    From: zhangsan@sample.com
    To: lisi@sample.com, wangwu@sample.com 
    Subject:  "Hello World" 
    Cc: lilei@sample.com, hanmei@sample.com
    Mime-Version: 1.0
    Content-Type: text/plain; charset=us-ascii
    Content-Transfer-Encoding: 7bit
    Bcc: jack@sample.com, lily@sample.com
    X-From: Zhang San
    X-To: Li Si, Wang Wu
    X-cc: Li Lei, Han Mei
    X-bcc: 
    X-Folder: \Li_Si_June2001\Notes Folders\Notes inbox
    X-Origin: Lucy
    X-FileName: sample.nsf
    
    Dear Associate / Analyst Committee:
    
    Hello World! 
    
    Thank you,
    
    Associate / Analyst Program 
    zhangsan
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区