做维表
语法格式
1 2 3 4 5 6 7 8 9 10 11 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 |
说明 |
默认值 |
---|---|---|
connector |
flink框架区分Connector参数,固定为dws。 |
- |
url |
数据库连接地址。 |
- |
username |
配置连接用户。 |
- |
password |
数据库用户密码。 |
- |
参数 |
名称 |
类型 |
说明 |
默认值 |
---|---|---|---|---|
connectionSize |
读取线程池大小 |
int |
用于执行操作的线程数量 = 数据库连接数量,同写入线程大小。 |
1 |
readBatchSize |
最多一次将get请求合并提交的数量 |
int |
当查询请求积压后,最大的批量查询数量。 |
128 |
readBatchQueueSize |
get请求缓冲池大小 |
int |
查询请求最大积压容量。 |
256 |
readTimeoutMs |
get操作的超时时间(毫秒/ms) |
int |
默认值0表示不超时,会在两处位置生效:
|
0 |
readSyncThreadEnable |
非异步查询时,是否开启线程池 |
boolean |
开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。 |
true |
lookupScanEnable |
是否开启scan查询 |
boolean |
关联条件在非全主键匹配下,是否开启scan查询。 若为false,则join关联条件必须全为主键,否则将抛异常。 |
false |
fetchSize / lookupScanFetchSize |
scan一次查询大小 |
int |
非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。 |
1000 |
lookupScanTimeoutMs |
scan操作的超时时间(毫秒/ms) |
int |
非全主键匹配下,一次条件查询的超时限制(ms)。 |
60000 |
lookupAsync |
是否采用异步方式获取数据 |
boolean |
查询方式设置为同步or异步。 |
true |
lookupCacheType |
缓存策略 |
LookupCacheType |
设置以下缓存策略(不区分大小写):
|
LookupCacheType.LRU |
lookupCacheMaxRows |
缓存大小 |
long |
当选择LRU缓存策略后,可以设置缓存大小。 |
1000 |
lookupCacheExpireAfterAccess |
读取后开始计算的超时时间 |
Duration |
当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。 |
null |
lookupCacheExpireAfterWrite |
写入后开始计算的超时时间 |
Duration |
当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。 |
10s |
lookupCacheMissingKey |
数据不存在后写入缓存 |
boolean |
当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。 |
false |
lookupCacheReloadStrategy |
全量缓存重载策略 |
ReloadStrategy |
当选择ALL缓存策略后,可以设置以下数据重载策略:
|
ReloadStrategy.PERIODIC |
lookupCachePeriodicReloadInterval |
数据重载时间间隔 |
Duration |
当选择PERIOD重载策略时,可以设置全量缓存重载间隔。 |
1h |
lookupCachePeriodicReloadMode |
数据重载模式 |
ScheduleMode |
当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写):
|
ScheduleMode.FIXED_DELAY |
lookupCacheTimedReloadTime |
数据重载定时调度时间 |
string |
当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。 |
00:00 |
lookupCacheTimedReloadIntervalDays |
数据重载定时周期调度间隔天数 |
int |
当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。 |
1 |
示例
从Kafka源表中读取数据,将GaussDB(DWS)表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下:
- 连接GaussDB(DWS)数据库实例,在GaussDB(DWS)中创建相应的表,作为维表,表名为area_info,SQL语句如下:
1 2 3 4 5 6 7 8 9
create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) );
- 连接GaussDB(DWS)数据库实例,向GaussDB(DWS)维表area_info中插入测试数据,其语句如下:
1 2 3 4 5 6 7
insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
- flink sql创建源表、结果表、维表并执行SQL:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- 在Kafka中写入数据:
1 2 3
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- 结果参考如下: