更新时间:2024-07-27 GMT+08:00

DWS结果表

功能描述

DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。

数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》

前提条件

  • 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。

    如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。

  • 请确保已创建DWS数据库表。
  • 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
  • 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

语法格式

DWS结果表中不允许指定所有属性为PRIMARY KEY。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector.type' = 'gaussdb',
  'connector.url' = '',
  'connector.table' = '',
  'connector.driver' = '',
  'connector.username' = '',
  'connector.password' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector类型,需配置为'gaussdb'

connector.url

jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。

connector.table

操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。

connector.driver

jdbc连接驱动,默认为: org.postgresql.Driver。

connector.username

数据库认证用户名,需要和'connector.password'一起配置

connector.password

数据库认证密码,需要和'connector.username'一起配置

connector.write.mode

数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。

该参数与'primary key'配合使用。

  • 未配置'primary key'时,支持copy及insert两种模式追加写入。
  • 配置'primary key',支持copy、upsert以及insert三种模式更新写入。

注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。

connector.write.flush.max-rows

数据flush大小,超过该值将触发写入flush。默认为5000。

connector.write.flush.interval

数据flush周期,周期性触发写入flush。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。不填写则默认不根据时间刷新。

connector.write.max-retries

写入最大重试次数,默认为3。

connector.write.merge.filter-key

配置PRIMARY KEY,并且“connector.write.mode”配置为copy时,可以配置merge时的过滤列名。

connector.write.escape-string-value

是否对string类型值进行转义,默认为false。

注意事项

示例

  • 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。
    • 使用upsert模式,写入数据到DWS
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      create table dwsSink(
        car_id STRING,
        car_owner STRING,
        car_brand STRING,
        car_speed INT
      ) with (
        'connector.type' = 'gaussdb',
        'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx',
        'connector.table' = 'car_info',
        'connector.username' = 'xx',
        'connector.password' = 'xx',
        'connector.write.mode' = 'upsert',
        'connector.write.flush.interval' = '30s'
      );
      
      当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。
      CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm (
        ddate DATE,
        dmin TIMESTAMP(3),
        game_appkey VARCHAR,
        channel_id VARCHAR,
        pay_user_num_1m bigint,
        pay_amt_1m bigint,
        PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED
      ) WITH (
        'connector.type' = 'gaussdb',
        'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/dws_bigdata_db',
        'connector.table' = 'ads_game_sdk_base\".\"test',
        'connector.username' = 'xxxx',
        'connector.password' = 'xxxxx',
        'connector.write.mode' = 'upsert',
        'connector.write.flush.interval' = '30s'
      );
  • 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。
    当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。
    create table dwsSink(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT
    ) with (
      'connector.type' = 'gaussdb',
      'connector.table' = 'ads_game_sdk_base\".\"test',
      'connector.driver' = 'com.huawei.gauss200.jdbc.Driver',
      'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx',
      'connector.username' = 'xx',
      'connector.password' = 'xx',
      'connector.write.mode' = 'upsert',
      'connector.write.flush.interval' = '30s'
    );