更新时间:2024-06-24 GMT+08:00
分享

DWS结果表(不推荐使用)

功能描述

DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。

数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》

推荐使用DWS服务自研的DWS Connector。

DWS-Connector的使用方法请参考dws-connector-flink

前提条件

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。
  • 请确保已创建DWS数据库表。
  • 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • with参数中字段只能使用单引号,不能使用双引号。
  • 若需要使用upsert模式,则必须在DWS结果表和该结果表连接的DWS表都定义主键。
  • 若DWS在不同的schema中存在相同名称的表,则在flink opensource sql中需要指定相应的schema。
  • 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。
    例如,使用gsjdbc4驱动连接、upsert模式写入数据到DWS中。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    create table dwsSink(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT
    ) with (
      'connector' = 'gaussdb',
      'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDatabase',
      'table-name' = 'car_info',
      'username' = 'DwsUserName',
      'password' = 'DwsPasswrod',
      'write.mode' = 'upsert'
    );
    
  • 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。
    当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例创建DWS结果表。
    create table dwsSink(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT
    ) with (
      'connector' = 'gaussdb',
      'table-name' = 'ads_game_sdk_base.test',
      'driver' = 'com.huawei.gauss200.jdbc.Driver',
      'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDatabase',
      'username' = 'DwsUserName',
      'password' = 'DwsPasswrod',
      'write.mode' = 'upsert'
    );

语法格式

DWS结果表中不允许指定所有属性为PRIMARY KEY。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'gaussdb',
  'url' = '',
  'table-name' = '',
  'driver' = '',
  'username' = '',
  'password' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

类型

说明

connector

String

指定要使用的连接器,这里是'gaussdb'

url

String

jdbc连接地址 。

使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。

使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。

table-name

String

操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考常见问题说明。

driver

org.postgresql.Driver

String

jdbc连接驱动,默认为: org.postgresql.Driver。

  • 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。
  • 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。

username

String

DWS数据库认证用户名,需要和'password'一起配置

password

String

DWS数据库认证密码,需要和'username'一起配置

write.mode

String

数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。

该参数与'primary key'配合使用。

  • 未配置'primary key'时,支持copy及insert两种模式追加写入。
  • 配置'primary key',支持copy、upsert以及insert三种模式更新写入。

注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。

sink.buffer-flush.max-rows

100

Integer

每次写入请求缓存的最大行数。

它能提升写入数据的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

sink.buffer-flush.interval

1s

Duration

刷新缓存的间隔,在这段时间内以异步线程刷新数据。

它能提升写入数据库的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",并设置刷新缓存的间隔,则以完整的异步处理方式刷新缓存。

格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

sink.max-retries

3

Integer

写入最大重试次数。

write.escape-string-value

false

Boolean

是否对string类型值进行转义。该参数仅用于write.mode为copy模式下。

key-by-before-sink

false

Boolean

在sink算子前是否按指定的主键进行分区。

该参数旨在解决多并发写入的场景下且write.mode为upsert时,如果多个子任务中写入sink的一批数据具有不止一条相同的主键,并且主键相同的这些数据先后顺序不一致,就会导致两个子任务在向DWS根据主键获取行锁时发生互锁的问题。

示例

该示例是从kafka数据源中读取数据,并以insert模式写入DWS结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 连接DWS数据库,在DWS中创建相应的表,表名为dws_order,SQL语句参考如下:
    create table public.dws_order(
      order_id VARCHAR,
      order_channel VARCHAR,
      order_time VARCHAR,
      pay_amount FLOAT8,
      real_pay FLOAT8,
      pay_time VARCHAR,
      user_id VARCHAR,
      user_name VARCHAR,
      area_id VARCHAR);
  4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作业数据源,将DWS作为结果表。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE dwsSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'gaussdb',
      'url' = 'jdbc:postgresql://DWSAddress:DWSPort/DWSdbName',
      'table-name' = 'dws_order',
      'driver' = 'org.postgresql.Driver',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword',
      'write.mode' = 'insert'
    );
    
    insert into dwsSink select * from kafkaSource;
  5. 连接Kafka集群,向Kafka中输入以下测试数据。
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. 从DWS中使用如下SQL语句查看数据结果。
     select * from dws_order
    数据结果参考如下:
    202103241000000001	webShop	2021-03-24 10:00:00	100.0	100.0	2021-03-24 10:02:03	0001	Alice	330106

常见问题

  • Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
    java.io.IOException: unable to open JDBC writer
    ...
    Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
    ...
    Caused by: java.net.SocketTimeoutException: connect timed out
    A:应考虑是跨源没有绑定,或者跨源没有绑定成功。
  • Q:如果该DWS表在某schema下,则应该如何配置?
    A:当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例中的'table-name'参数配置。
    CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm (
      ddate DATE,
      dmin TIMESTAMP(3),
      game_appkey VARCHAR,
      channel_id VARCHAR,
      pay_user_num_1m bigint,
      pay_amt_1m bigint,
      PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED
    ) WITH (
      'connector' = 'gaussdb',
      'url' = 'jdbc:postgresql://<yourDwsAddress>:<yourDwsPort>/dws_bigdata_db',
      'table-name' = 'ads_game_sdk_base.test',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'write.mode' = 'upsert'
    );
  • Q:作业运行正常,但是DWS中一直没有数据怎么办?
    A:请分别排查以下场景:
    • 查看jobmanager和taskmanager的日志是否有错误抛出。日志查看操作步骤如下:
      1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
      2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
      3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”或“jobmanager”的文件夹进入,下载获取taskmanager.out和jobmanager.out文件查看结果日志。
    • 验证跨源是否正确绑定且安全组规则已对该队列开放。
    • 查看所要写入的DWS表是否在多个不同的schema中存在。若存在,则需要在flink作业中指定schema。

相关文档