做源表
语法格式
用DWS-Connector做源表时,DWS-Connector实现了SupportsLimitPushDown和SupportsFilterPushDown接口,支持将limit和where条件下推到数据库执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 | create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 | 说明 | 默认值 |
|---|---|---|
connector | flink框架区分connector参数,固定为dws。 | - |
url | 数据库连接地址。 | - |
username | 配置连接用户。 | - |
password | 配置密码。 | - |
tableName | 对应dws表。 | - |
参数 | 说明 | 默认值 |
|---|---|---|
fetchSize | jdbc statement中fetchSize参数,用于控制查询数据库返回条数。 | 1000 |
enablePushDown | 开启条件下推:开启后limit和where条件会下推到数据库执行。 | true |
示例
该示例是从DWS数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下:
- 在DWS中创建相应的表,表名为dws_order,SQL语句参考如下:
1 2 3 4 5 6 7 8 9 10
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
- 在DWS中执行以下SQL语句,向dws_order表中插入数据:
1 2 3 4 5 6 7 8 9 10 11 12
insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- 执行Flink SQL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'password' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource;
- 执行结果如下:
常见问题
- Q:我想新建一个忽略尾空的数据库表,并且通过FlinkSQL筛选数据,这种场景支持么?
A:支持此场景。但是当数据库新建时添加了td_rtrim参数,并且FlinkSQL中WHERE子句添加了等值的筛选条件,可能导致数据不一致的问题:
Flink默认有列裁剪的行为(SELECT中会去掉筛选列,直接拼接WHERE子句中条件的值),这会导致最终的结果使用了WHERE子句中的值,而不是真正源端的数据。
假设一个场景,源端a列的值有3个值'a'、'a '、'a ',由于td_rtrim参数,那么select a from xxx where a ='a'会返回3个结果,但是由于Flink列裁剪的行为,会导致最终3个结果均被错误的拼成了'a',这就导致了数据一致性错误。
规避手段:在FlinkSQL的WHERE条件中添加rtrim函数,将筛选列包住,如select * from xxx where rtrim(xxx) ='a',即可解决上述问题。


