OBS输入流
功能描述
创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。
对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。
语法格式
1 2 3 4 5 6 7 8 9 10 |
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "obs",
region = "",
bucket = "",
object_name = "",
row_delimiter = "\n",
field_delimiter = '',
version_id = ""
);
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“obs”表示数据源为对象存储服务。 |
region |
是 |
对象存储服务所在区域。 |
encode |
否 |
数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。 |
ak |
否 |
访问密钥ID(Access Key ID)。 |
sk |
否 |
Secret Access Key,与访问密钥ID结合使用的密钥。 |
bucket |
是 |
数据所在的OBS桶名。 |
object_name |
是 |
数据所在OBS桶中的对象名。如果对象不在OBS根目录下,则需添加文件夹名,例如:test/test.csv。对象文件格式参考“encode”参数。 |
row_delimiter |
是 |
行间的分隔符。 |
field_delimiter |
否 |
属性分隔符。
|
quote |
否 |
可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。
说明:
|
version_id |
否 |
版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。 |
注意事项
在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型。
示例
- 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。
测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为input.csv格式文件。将input.csv上传到对应OBS桶目录下。例如,当前上传到:“dli-test-obs01”桶目录下。
1,2,3,4,1403149534 5,6,7,8,1403149535
创建表参考如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", bucket = "dli-test-obs01", region = "xxx", object_name = "input.csv", row_delimiter = "\n", field_delimiter = "," );
- 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。
CREATE SOURCE STREAM obs_source ( str STRING ) WITH ( type = "obs", bucket = "obssource", region = "xxx", encode = "json", row_delimiter = "\n", object_name = "input.json" );