更新时间:2022-08-12 GMT+08:00

DWS输出流(通过OBS转储方式)

功能描述

创建sink流将Flink作业数据通过OBS转储方式输出到数据仓库服务(DWS),即Flink作业数据先输出到OBS,然后再从OBS导入到DWS。如何导入OBS数据到DWS具体可参考《数据仓库服务数据库开发指南》中“从OBS并行导入数据到集群”章节。

数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。

注意事项

  • 通过OBS转储支持两种中间文件方式:
    • ORC: ORC格式不支持Array数据类型,如果使用ORC格式,需先在DWS中创建外部服务器,具体可参考《数据仓库服务数据库开发指南》中“创建外部服务器”章节。
    • CSV: CSV格式默认记录分隔符为换行符,若属性内容中有换行符,建议配置quote,具体参见表1
  • 如果要写入的表不存在,则会自动创建表。由于DLI SQL类型不支持text,如果存在长文本,建议先在数据库中创建表。
  • encode使用orc格式时,创建DWS表时,如果SQL流字段属性定义为String类型,DWS表字段属性不能使用varchar类型,需使用特定的text类型;如果是SQL流字段属性定义为Integer类型,DWS表字段需要使用Integer类型。

前提条件

  • 确保已创建OBS桶和文件夹。

    如何创建OBS桶,具体请参见《对象存储服务用户指南》中的“创建桶”章节。

    如何新建文件夹,具体请参见《对象存储服务用户指南》中的“新建文件夹”章节。

  • 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立增强型跨源连接,请参考《数据湖探索用户指南》“增强型跨源连接”章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
          type = "dws",
          region = "",
          ak = "",
          sk = "",
          encode = "",
          field_delimiter = "",
          quote = "",
          db_obs_server = "",
          obs_dir = "",
          username = "",
          password =  "",
          db_url = "",
          table_name = "",
          max_record_num_per_file = "",
          dump_interval = ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,dws表示输出到数据仓库服务中。

region

数据仓库服务所在区域。

ak

访问密钥ID(Access Key ID)。

sk

Secret Access Key,与访问密钥ID结合使用的密钥。

encode

编码方式。当前支持csv和orc两种方式。

field_delimiter

属性分隔符。当编码方式为csv时需要配置,建议尽量用不可见字符作为分隔符,如\u0006\u0002。

quote

单字节,建议使用不可见字符,如\u0007。

db_obs_server

已在数据库中创建的外部服务器,如obs_server。

如何创建外部服务器,具体操作步骤可参考《数据仓库服务数据库开发指南》中“创建外部服务器”章节。

如果编码方式为orc格式时需指定该参数。

obs_dir

中间文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。

username

数据库连接用户名。

password

数据库连接密码。

db_url

数据库连接地址。格式为/ip:port/database,如 “192.168.1.21:8000/test1”。

table_name

数据表名,若表不存在,则自动创建。

max_record_num_per_file

每个文件最多存储多少条记录。当文件记录数少于最大值时,该文件会延迟一个转储周期输出。

dump_interval

转储周期,单位为秒。

delete_obs_temp_file

是否要删除obs上的临时文件,默认为“true”,若设置为“false”,则不会删除obs上的文件,需用户自己清理。

max_dump_file_num

执行一次转储操作时最多转储多少文件。 当本次转储操作发现文件数小于最大值,则会延迟一个转储周期输出。

示例

  • CSV格式转储。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    CREATE SINK STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "dws",
        region = "xxx",
        ak = "",
        sk = "",
        encode = "csv",
        field_delimiter = "\u0006\u0006\u0002",
        quote = "\u0007",
        obs_dir = "dli-append-2/dws",
        username = "",
        password =  "",
        db_url = "192.168.1.12:8000/test1",
        table_name = "table1",
        max_record_num_per_file = "100",
        dump_interval = "10"
      );
    
  • ORC格式转储。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    CREATE SINK STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT,
      car_timestamp LONG
    )
      WITH (
        type = "dws",
        region = "xxx",
        ak = "",
        sk = "",
        encode = "orc",
        db_obs_server = "obs_server",
        obs_dir = "dli-append-2/dws",
        username = "",
        password =  "",
        db_url = "192.168.1.12:8000/test1",
        table_name = "table1",
        max_record_num_per_file = "100",
        dump_interval = "10"
      );