更新时间:2024-07-01 GMT+08:00
分享

DWS维表

功能描述

创建DWS表用于与输入流连接,从而生成相应的宽表。

前提条件

  • 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。
  • 请确保已创建DWS数据库表。
  • 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector' = 'gaussdb',
  'url' = '',
  'table-name' = '',
  'username' = '',
  'password' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'gaussdb'。

url

String

jdbc连接地址。

使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。

使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。

table-name

String

读取数据库中的数据所在的表名。

driver

String

jdbc连接驱动,默认为: org.postgresql.Driver。

  • 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。
  • 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。

username

String

数据库认证用户名,需要和'password'一起配置。

password

String

数据库认证密码,需要和'username'一起配置。

scan.partition.column

String

用于对输入进行分区的列名。

与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。

scan.partition.lower-bound

Integer

第一个分区的最小值。

与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。

scan.partition.upper-bound

Integer

最后一个分区的最大值。

与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在。

scan.partition.num

Integer

分区的个数。

与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。

scan.fetch-size

0

Integer

每次从数据库拉取数据的行数。默认值为0,表示不限制。

scan.auto-commit

true

Boolean

设置自动提交标志。

它决定每一个statement是否以事务的方式自动提交。

lookup.cache.max-rows

Integer

维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。

默认表示不使用该配置。

lookup.cache.ttl

Duration

维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

默认表示不使用该配置。

lookup.max-retries

3

Integer

维表配置,数据拉取最大重试次数。

pwd_auth_name

String

DLI侧创建的Password类型的跨源认证名称。

使用跨源认证则无需在作业中配置账号和密码。

示例

从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下:
    create table public.area_info(
      area_id VARCHAR,
      area_province_name VARCHAR,
      area_city_name VARCHAR,
      area_county_name VARCHAR,
      area_street_name VARCHAR,
      region_name VARCHAR);
  4. 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下:
      insert into area_info
      (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) 
      values
      ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'),
      ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'),
      ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'),
      ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
  5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作为数据源,DWS作为维表,数据输出到Kafka结果表中。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'dws-order',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    --创建地址维表
    create table area_info (
        area_id string, 
        area_province_name string, 
        area_city_name string, 
        area_county_name string,
        area_street_name string, 
        region_name string 
    ) WITH (
      'connector' = 'gaussdb',
      'driver' = 'org.postgresql.Driver',
      'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName',
      'table-name' = 'area_info',
      'username' = 'DwsUserName',
      'password' = 'DwsPassword',
      'lookup.cache.max-rows' = '10000',
      'lookup.cache.ttl' = '2h'
    );
    
    --根据地址维表生成详细的包含地址的订单信息宽表
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'kafka',
      'topic' = 'KafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders
        left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
  6. 连接Kafka集群,向kafka中source topic中插入如下测试数据:
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
  7. 连接Kafka集群,读取kafka中sink topic中数据,结果参考如下:
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}
    
    {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
    
    {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}

常见问题

  • Q:若Flink作业日志中有如下报错信息,应该怎么解决?
    java.io.IOException: unable to open JDBC writer
    ...
    Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
    ...
    Caused by: java.net.SocketTimeoutException: connect timed out
    A:应考虑是跨源没有绑定,或者跨源没有绑定成功。
  • Q:如果该DWS表在某schema下,则应该如何配置?
    A:如下示例是使用schema为dbuser2下的表area_info:
    --创建地址维表
    create table area_info (
        area_id string, 
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string, 
        region_name string 
    ) WITH (
     'connector' = 'gaussdb',
      'driver' = 'org.postgresql.Driver',
      'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbname',
      'table-name' = 'dbuser2.area_info',
      'username' = 'DwsUserName',
      'password' = 'DwsPassword',
      'lookup.cache.max-rows' = '10000',
      'lookup.cache.ttl' = '2h'
    );

相关文档