OBS输入流
功能描述
创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。
对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。
语法格式
1 2 3 4 5 6 7 8 9 10 11 |
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "obs",
region = "",
bucket = "",
object_name = "",
row_delimiter = "\n",
field_delimiter = '',
version_id = ""
)
(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“obs”表示数据源为对象存储服务。 |
region |
是 |
对象存储服务所在区域。 |
encode |
否 |
数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。 |
ak |
否 |
访问密钥ID(Access Key ID)。 |
sk |
否 |
Secret Access Key,与访问密钥ID结合使用的密钥。 |
bucket |
是 |
数据所在的OBS桶名。 |
object_name |
是 |
数据所在的OBS桶中的对象名。对象文件格式参考“encode”参数。 |
row_delimiter |
是 |
行间的分隔符。 |
field_delimiter |
否 |
属性分隔符。
|
quote |
否 |
可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。
说明:
|
version_id |
否 |
版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。 |
timeindicator |
否 |
在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。
说明:
|
注意事项
用来做时间戳的属性类型必须为long或者timestamp。
示例
- 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", bucket = "obssource", region = "dc1-az1", object_name = "input.csv", row_delimiter = "\n", field_delimiter = "," ) TIMESTAMP BY car_timestamp.rowtime;
- 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。
CREATE SOURCE STREAM obs_source ( str STRING ) WITH ( type = "obs", bucket = "obssource", region = "dc1-az1", encode = "json", row_delimiter = "\n", object_name = "input.json" );