做维表
语法格式
1 2 3 4 5 6 7 8 9 10 11 | create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 | 说明 | 默认值 |
|---|---|---|
connector | flink框架区分Connector参数,固定为dws。 | - |
url | 数据库连接地址。 | - |
username | 配置连接用户。 | - |
password | 数据库用户密码。 | - |
参数 | 名称 | 类型 | 说明 | 默认值 |
|---|---|---|---|---|
connectionSize | 读取线程池大小 | int | 用于执行操作的线程数量 =数据库连接数量,同写入线程大小。 | 1 |
readBatchSize | 最多一次将get请求合并提交的数量 | int | 当查询请求积压后,最大的批量查询数量。 | 128 |
readBatchQueueSize | get请求缓冲池大小 | int | 查询请求最大积压容量。 | 256 |
readTimeoutMs | get操作的超时时间(毫秒/ms) | int | 默认值0表示不超时,会在两处位置生效:
| 0 |
readSyncThreadEnable | 非异步查询时,是否开启线程池 | boolean | 开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。 | true |
lookupScanEnable | 是否开启scan查询 | boolean | 关联条件在非全主键匹配下,是否开启scan查询。 若为false,则join关联条件必须全为主键,否则将抛异常。 | false |
lookupBatchScanEnable | 是否允许scan查询做攒批操作 | boolean | 仅lookupScanEnable为true时有效,非全主键匹配场景下,scan操作是否允许攒批操作。 如果为false,则scan只能逐一处理数据,性能较差。 | false |
fetchSize / lookupScanFetchSize | scan一次查询大小 | int | 非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。 | 1000 |
lookupScanTimeoutMs | scan操作的超时时间(毫秒/ms) | int | 非全主键匹配下,一次条件查询的超时限制(ms)。 | 60000 |
lookupAsync | 是否采用异步方式获取数据 | boolean | 查询方式设置为同步or异步。 | true |
lookupCacheType | 缓存策略 | LookupCacheType | 设置以下缓存策略(不区分大小写):
| LRU |
lookupCacheMaxRows | 缓存大小 | long | 当选择LRU缓存策略后,可以设置缓存大小。 | 1000 |
lookupCacheExpireAfterAccess | 读取后开始计算的超时时间 | Duration | 当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。 | null |
lookupCacheExpireAfterWrite | 写入后开始计算的超时时间 | Duration | 当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。 | 10s |
lookupCacheMissingKey | 数据不存在后写入缓存 | boolean | 当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。 | false |
lookupCacheReloadStrategy | 全量缓存重载策略 | ReloadStrategy | 当选择ALL缓存策略后,可以设置以下数据重载策略:
| PERIODIC |
lookupCachePeriodicReloadInterval | 数据重载时间间隔 | Duration | 当选择PERIOD重载策略时,可以设置全量缓存重载间隔。 | 1h |
lookupCachePeriodicReloadMode | 数据重载模式 | ScheduleMode | 当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写):
| FIXED_DELAY |
lookupCacheTimedReloadTime | 数据重载定时调度时间 | string | 当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。 | 00:00 |
lookupCacheTimedReloadIntervalDays | 数据重载定时周期调度间隔天数 | int | 当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。 | 1 |
示例
从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下:
- 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下:
1 2 3 4 5 6 7 8 9
create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) );
- 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下:
1 2 3 4 5 6 7
insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
- flink sql创建源表、结果表、维表并执行SQL:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'password', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; - 在Kafka中写入数据:
1 2 3
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- 结果参考如下:


