更新时间:2024-07-27 GMT+08:00

OBS输入流

功能描述

创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "obs",
    region = "",
    bucket = "",
    object_name = "",
    row_delimiter = "\n",
    field_delimiter = '',
    version_id = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,“obs”表示数据源为对象存储服务。

region

对象存储服务所在区域。

encode

数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。

ak

访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证

sk

Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证

bucket

数据所在的OBS桶名。

object_name

数据所在OBS桶中的对象名。如果对象不在OBS根目录下,则需添加文件夹名,例如:test/test.csv。对象文件格式参考“encode”参数。

row_delimiter

行间的分隔符。

field_delimiter

属性分隔符。

  • 当“encode”参数为csv时,该参数必选。用户可以自定义属性分隔符。
  • 当“encode”参数为json时,该参数不需要填写。

quote

可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。

  • 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
  • 当引用符号为单引号时,则设置quote = "'"。
说明:
  • 目前只适用于CSV格式。
  • 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。

version_id

版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。

注意事项

在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型

示例

  • 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。

    测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为input.csv格式文件。将input.csv上传到对应OBS桶目录下。例如,当前上传到:“dli-test-obs01”桶目录下。

    1,2,3,4,1403149534
    5,6,7,8,1403149535
    创建表参考如下:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "obs",
        bucket = "dli-test-obs01",
        region = "xxx",
        object_name = "input.csv",
        row_delimiter = "\n",
        field_delimiter = ","
    );
    
  • 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。
    CREATE SOURCE STREAM obs_source (
      str STRING
    )
      WITH (
        type = "obs",
        bucket = "obssource",
        region = "xxx",
        encode = "json",
        row_delimiter = "\n",
        object_name = "input.json"
    );