更新时间:2022-02-22 GMT+08:00

OBS输入流

功能描述

创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "obs",
    region = "",
    bucket = "",
    object_name = "",
    row_delimiter = "\n",
    field_delimiter = '',
    version_id = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,“obs”表示数据源为对象存储服务。

region

对象存储服务所在区域。

encode

数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。

ak

访问密钥ID(Access Key ID)。

sk

Secret Access Key,与访问密钥ID结合使用的密钥。

bucket

数据所在的OBS桶名。

object_name

数据所在的OBS桶中的对象名。对象文件格式参考“encode”参数。

row_delimiter

行间的分隔符。

field_delimiter

属性分隔符。

  • 当“encode”参数为csv时,该参数必选。用户可以自定义属性分隔符。
  • 当“encode”参数为json时,该参数不需要填写。

quote

可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。

  • 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
  • 当引用符号为单引号时,则设置quote = "'"。
说明:
  • 目前只适用于CSV格式。
  • 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。

version_id

版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。

timeindicator

在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。

说明:
  • 若设置“processing time”,则为proctime.proctime。

    当设置了proctime.proctime时,会在原有属性字段基础上多增加一个proctime系统时间戳属性,假设原有字段为3个,设置了proctime.proctime后会变成4个,设置rowtime属性字段不会发生变化。

  • 若设置“event time”,可选择流中的某个属性来作为时间戳,格式为attr_name.rowtime。
  • 以上两者可以同时设置。

注意事项

用来做时间戳的属性类型必须为long或者timestamp。

示例

  • 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "obs",
        bucket = "obssource",
        region =  "dc1-az1",
        object_name = "input.csv",
        row_delimiter = "\n",
        field_delimiter = ","
    )
      TIMESTAMP BY car_timestamp.rowtime;
    
  • 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。
    CREATE SOURCE STREAM obs_source (
      str STRING
    )
      WITH (
        type = "obs",
        bucket = "obssource",
        region =  "dc1-az1",
        encode = "json",
        row_delimiter = "\n",
        object_name = "input.json"
    );