更新时间:2024-06-29 GMT+08:00

做维表

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

参数说明

表1 数据库配置

参数

说明

默认值

connector

flink框架区分Connector参数,固定为dws。

-

url

数据库连接地址。

-

username

配置连接用户。

-

password

数据库用户密码。

-

表2 连接配置参数

参数

名称

类型

说明

默认值

connectionSize

读取线程池大小

int

用于执行操作的线程数量 = 数据库连接数量,同写入线程大小。

1

readBatchSize

最多一次将get请求合并提交的数量

int

当查询请求积压后,最大的批量查询数量。

128

readBatchQueueSize

get请求缓冲池大小

int

查询请求最大积压容量。

256

readTimeoutMs

get操作的超时时间(毫秒/ms)

int

默认值0表示不超时,会在两处位置生效:

  • get操作从用户开始执行到client准备提交到dws的等待时间。
  • get sql的执行超时,即statement query timeout。

0

readSyncThreadEnable

非异步查询时,是否开启线程池

boolean

开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。

true

lookupScanEnable

是否开启scan查询

boolean

关联条件在非全主键匹配下,是否开启scan查询。

若为false,则join关联条件必须全为主键,否则将抛异常。

false

fetchSize / lookupScanFetchSize

scan一次查询大小

int

非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。

1000

lookupScanTimeoutMs

scan操作的超时时间(毫秒/ms)

int

非全主键匹配下,一次条件查询的超时限制(ms)。

60000

lookupAsync

是否采用异步方式获取数据

boolean

查询方式设置为同步or异步。

true

lookupCacheType

缓存策略

LookupCacheType

设置以下缓存策略(不区分大小写):

  • None:无缓存LRU(默认值):缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。
  • ALL:全量数据缓存,适合不常更新小表。

LookupCacheType.LRU

lookupCacheMaxRows

缓存大小

long

当选择LRU缓存策略后,可以设置缓存大小。

1000

lookupCacheExpireAfterAccess

读取后开始计算的超时时间

Duration

当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。

null

lookupCacheExpireAfterWrite

写入后开始计算的超时时间

Duration

当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。

10s

lookupCacheMissingKey

数据不存在后写入缓存

boolean

当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。

false

lookupCacheReloadStrategy

全量缓存重载策略

ReloadStrategy

当选择ALL缓存策略后,可以设置以下数据重载策略:

  • PERIODIC:周期性数据重载。
  • TIMED:定时数据重载,以天为单位。

ReloadStrategy.PERIODIC

lookupCachePeriodicReloadInterval

数据重载时间间隔

Duration

当选择PERIOD重载策略时,可以设置全量缓存重载间隔。

1h

lookupCachePeriodicReloadMode

数据重载模式

ScheduleMode

当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写):

  • FIXED_DELAY:从上一个加载结束计算重新加载间隔。
  • FIXED_RATE:从上一个加载开始计算重新加载间隔。

ScheduleMode.FIXED_DELAY

lookupCacheTimedReloadTime

数据重载定时调度时间

string

当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。

00:00

lookupCacheTimedReloadIntervalDays

数据重载定时周期调度间隔天数

int

当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。

1

示例

从Kafka源表中读取数据,将GaussDB(DWS)表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下:

  1. 连接GaussDB(DWS)数据库实例,在GaussDB(DWS)中创建相应的表,作为维表,表名为area_info,SQL语句如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    create table public.area_info(
      area_id VARCHAR,
      area_province_name VARCHAR,
      area_city_name VARCHAR,
      area_county_name VARCHAR,
      area_street_name VARCHAR,
      region_name VARCHAR,
      PRIMARY KEY(area_id)
    );
    
  2. 连接GaussDB(DWS)数据库实例,向GaussDB(DWS)维表area_info中插入测试数据,其语句如下:
    1
    2
    3
    4
    5
    6
    7
    insert into area_info
      (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) 
      values
      ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'),
      ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'),
      ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'),
      ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
    
  3. flink sql创建源表、结果表、维表并执行SQL:
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'order_test',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'dws-order',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    --创建地址维表
    create table area_info (
        area_id string, 
        area_province_name string, 
        area_city_name string, 
        area_county_name string,
        area_street_name string, 
        region_name string 
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName',
      'tableName' = 'area_info',
      'username' = 'DwsUserName',
      'password' = 'DwsPassword',
      'lookupCacheMaxRows' = '10000',
      'lookupCacheExpireAfterAccess' = '2h'
    );
    --根据地址维表生成详细的包含地址的订单信息宽表
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'print'
     
    );
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders
        left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
  4. 在Kafka中写入数据:
    1
    2
    3
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    
  5. 结果参考如下: