更新时间:2022-08-12 GMT+08:00

OBS输出流

功能描述

创建sink流将DLI数据输出到对象存储服务(OBS)。DLI可以将作业分析结果输出到OBS上。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。

推荐使用《文件系统输出流(推荐)》。

前提条件

OBS输出流功能仅支持输出数据到3.0版本以上的桶,请先查看桶信息确认桶的版本。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
           type = "obs",
           region = "",
           encode = "",
           field_delimiter = "",
           row_delimiter = "",
           obs_dir = "",
           file_prefix = "",
           rolling_size = "",
           rolling_interval = "",
           quote = "",
           array_bracket = "",
           append = "",
           max_record_num_per_file = "",
           dump_interval = "",
           dis_notice_channel = "",
           max_record_num_cache = "",
           carbon_properties = ""
  )

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,“obs”表示输出到对象存储服务。

region

对象存储服务所在区域。

ak

访问密钥ID(Access Key ID)。

sk

Secret Access Key,与访问密钥ID结合使用的密钥。

encode

编码方式。当前支持csv/json/orc/carbondata/avro/avro_merge/parquet七种格式。

field_delimiter

属性分隔符。

仅当编码方式为csv时需要配置,若不配置,默认分隔符为逗号。

row_delimiter

行分隔符。当编码格式为csv、json时需要设置。

json_config

当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1;field2=data_json.field2”。

obs_dir

文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。当编码格式为csv(append为false)、json(append为false)、avro_merge、parquet时,支持参数化。

file_prefix

输出文件名前缀。生成的文件会以file_prefix.x的方式命名,如file_prefix.1、 file_prefix.2,若没有设置,默认文件前缀为temp。此配置项不适用于carbondata文件。

rolling_size

单个文件最大允许大小。

说明:
  • rolling_size和rolling_interval必须至少配一样或者都配置。
  • 当文件大小超过设置size后,会生成新文件。
  • 支持的单位包括KB/MB/GB,若没写单位,表示单位为字节数。
  • 当编码格式为orc和carbondata时不需要设置。

rolling_interval

数据保存到对应目录的时间模式。

说明:
  • rolling_size和rolling_interval必须至少配一样或者都配置。
  • 设置后数据会按照输出时间输出到相应时间目录下。
  • 支持的格式为yyyy/MM/dd/HH/mm, 最小单位只到分钟,大小写敏感。例如配置为yyyy/MM/dd/HH, 则数据会写入对应小时这个时间点所产生的目录下,比如2018-09-10 16时产生的数据就会写到{obs_dir}/2018-09-10_16目录下。
  • 当rolling_size和rolling_interval都配置时,表示每个时间所对应的目录下,单个文件超过设置大小时,另起新文件。

quote

修饰符,仅当编码格式为csv时可配置,配置后会在每个属性前后各加上修饰符,建议使用不可见字符配置,如"\u0007"。

array_bracket

数组括号,仅当编码格式为csv时可配置, 可选值为"()", "{}", "[]", 例如配置了"{}", 则数组输出格式为{a1,a2}。

append

值为true或者false,默认为true。

当OBS不支持append模式,且编码格式为csv和json时,可将该参数设置为false。Append为false时需要设置max_record_num_per_file和dump_interval。

max_record_num_per_file

文件最大记录数,当编码格式为csv(append为false)、json(append为false)、orc、carbondata、avro、avro_merge和parquet时需配置,表明一个文件最多存储记录数,当达到最大值,则另起新文件。

dump_interval

触发周期, 当编码格式为orc或者配置了DIS通知提醒时需进行配置。

  • 在orc编码方式中,该配置表示周期到达时,即使文件记录数未达到最大个数配置,也将文件上传到OBS上。
  • 在DIS通知提醒功能中,该配置表示每周期往DIS发送一个通知提醒,表明该目录已写完。

dis_notice_channel

OBS目录完成通知通道。表示每周期往DIS通道中发送一条记录,该记录内容为OBS目录路径,表明该目录已书写完毕。

max_record_num_cache

记录最大缓存数,仅当编码格式为carbondata时,可以配置该字段,且最小值不能小于max_record_num_per_file,默认值为max_record_num_per_file。

carbon_properties

carbon属性,仅当编码格式为carbondata时,可以配置该字段,值为"k1=v1, k2=v2"的形式,支持carbon-sdk中函数withTableProperties所支持的所有配置项,另外还支持IN_MEMORY_FOR_SORT_DATA_IN_MB和UNSAFE_WORKING_MEMORY_IN_MB两个配置项。

encoded_data

当编码格式为json(append为false)、avro_merge和parquet时,可通过配置该参数指定真正需要编码的数据,格式为${field_name},表示直接将该流字段的内容作为一个完整的记录进行编码。

注意事项

当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。

示例

  • 将car_infos数据输出到OBS的obs-sink桶下,输出目录为car_infos, 输出文件以greater_30作为文件名前缀,当单个文件超过100M时新起一个文件,同时数据输出用csv编码,使用逗号作为属性分隔符,换行符作为行分隔符。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    CREATE SINK STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "obs",
        encode = "csv",
        region = "xxx",
        field_delimiter = ",",
        row_delimiter = "\n",
        obs_dir = "obs-sink/car_infos",
        file_prefix = "greater_30",
        rolling_size = "100m"
    );
    
  • carbondata编码格式示例
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    CREATE SINK STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "obs",
        region = "xxx",
        encode = "carbondata",
        obs_dir = "dli-append-2/carbondata",
        max_record_num_per_file = "1000",
        max_record_num_cache = "2000",
        dump_interval = "60",
        ROLLING_INTERVAL = "yyyy/MM/dd/HH/mm",
        dis_notice_channel = "dis-notice",
        carbon_properties = "long_string_columns=MessageBody, IN_MEMORY_FOR_SORT_DATA_IN_MB=512"
    );
    
  • orc编码格式示例
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    CREATE SINK STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "obs",
        region = "xxx",
        encode = "orc",
        obs_dir = "dli-append-2/obsorc",
        FILE_PREFIX = "es_info",
        max_record_num_per_file = "100000",
        dump_interval = "60"
    );
    
  • parquet编码示例请参考文件系统输出流(推荐)中的示例。