OBS输出流
功能描述
创建sink流将DLI数据输出到对象存储服务(OBS)。DLI可以将作业分析结果输出到OBS上。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。
对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。
推荐使用《文件系统输出流(推荐)》。
前提条件
OBS输出流功能仅支持输出数据到3.0版本以上的桶,请先查看桶信息确认桶的版本。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "obs",
region = "",
encode = "",
field_delimiter = "",
row_delimiter = "",
obs_dir = "",
file_prefix = "",
rolling_size = "",
rolling_interval = "",
quote = "",
array_bracket = "",
append = "",
max_record_num_per_file = "",
dump_interval = "",
dis_notice_channel = "",
max_record_num_cache = "",
carbon_properties = ""
)
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,“obs”表示输出到对象存储服务。 |
region |
是 |
对象存储服务所在区域。 |
ak |
否 |
访问密钥ID(Access Key ID)。 |
sk |
否 |
Secret Access Key,与访问密钥ID结合使用的密钥。 |
encode |
是 |
编码方式。当前支持csv/json/orc/carbondata/avro/avro_merge/parquet七种格式。 |
field_delimiter |
否 |
属性分隔符。 仅当编码方式为csv时需要配置,若不配置,默认分隔符为逗号。 |
row_delimiter |
否 |
行分隔符。当编码格式为csv、json时需要设置。 |
json_config |
否 |
当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1;field2=data_json.field2”。 |
obs_dir |
是 |
文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。当编码格式为csv(append为false)、json(append为false)、avro_merge、parquet时,支持参数化。 |
file_prefix |
否 |
输出文件名前缀。生成的文件会以file_prefix.x的方式命名,如file_prefix.1、 file_prefix.2,若没有设置,默认文件前缀为temp。此配置项不适用于carbondata文件。 |
rolling_size |
否 |
单个文件最大允许大小。
说明:
|
rolling_interval |
否 |
数据保存到对应目录的时间模式。
说明:
|
quote |
否 |
修饰符,仅当编码格式为csv时可配置,配置后会在每个属性前后各加上修饰符,建议使用不可见字符配置,如"\u0007"。 |
array_bracket |
否 |
数组括号,仅当编码格式为csv时可配置, 可选值为"()", "{}", "[]", 例如配置了"{}", 则数组输出格式为{a1,a2}。 |
append |
否 |
值为true或者false,默认为true。 当OBS不支持append模式,且编码格式为csv和json时,可将该参数设置为false。Append为false时需要设置max_record_num_per_file和dump_interval。 |
max_record_num_per_file |
否 |
文件最大记录数,当编码格式为csv(append为false)、json(append为false)、orc、carbondata、avro、avro_merge和parquet时需配置,表明一个文件最多存储记录数,当达到最大值,则另起新文件。 |
dump_interval |
否 |
触发周期, 当编码格式为orc或者配置了DIS通知提醒时需进行配置。
|
dis_notice_channel |
否 |
OBS目录完成通知通道。表示每周期往DIS通道中发送一条记录,该记录内容为OBS目录路径,表明该目录已书写完毕。 |
max_record_num_cache |
否 |
记录最大缓存数,仅当编码格式为carbondata时,可以配置该字段,且最小值不能小于max_record_num_per_file,默认值为max_record_num_per_file。 |
carbon_properties |
否 |
carbon属性,仅当编码格式为carbondata时,可以配置该字段,值为"k1=v1, k2=v2"的形式,支持carbon-sdk中函数withTableProperties所支持的所有配置项,另外还支持IN_MEMORY_FOR_SORT_DATA_IN_MB和UNSAFE_WORKING_MEMORY_IN_MB两个配置项。 |
encoded_data |
否 |
当编码格式为json(append为false)、avro_merge和parquet时,可通过配置该参数指定真正需要编码的数据,格式为${field_name},表示直接将该流字段的内容作为一个完整的记录进行编码。 |
注意事项
当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
示例
- 将car_infos数据输出到OBS的obs-sink桶下,输出目录为car_infos, 输出文件以greater_30作为文件名前缀,当单个文件超过100M时新起一个文件,同时数据输出用csv编码,使用逗号作为属性分隔符,换行符作为行分隔符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", encode = "csv", region = "xxx", field_delimiter = ",", row_delimiter = "\n", obs_dir = "obs-sink/car_infos", file_prefix = "greater_30", rolling_size = "100m" );
- carbondata编码格式示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", region = "xxx", encode = "carbondata", obs_dir = "dli-append-2/carbondata", max_record_num_per_file = "1000", max_record_num_cache = "2000", dump_interval = "60", ROLLING_INTERVAL = "yyyy/MM/dd/HH/mm", dis_notice_channel = "dis-notice", carbon_properties = "long_string_columns=MessageBody, IN_MEMORY_FOR_SORT_DATA_IN_MB=512" );
- orc编码格式示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", region = "xxx", encode = "orc", obs_dir = "dli-append-2/obsorc", FILE_PREFIX = "es_info", max_record_num_per_file = "100000", dump_interval = "60" );
- parquet编码示例请参考文件系统输出流(推荐)中的示例。