做源表
语法格式
用DWS-Connector做源表时,DWS-Connector实现了SupportsLimitPushDown和SupportsFilterPushDown接口,支持将limit和where条件下推到数据库执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 |
说明 |
默认值 |
---|---|---|
connector |
flink框架区分connector参数,固定为dws。 |
- |
url |
数据库连接地址。 |
- |
username |
配置连接用户。 |
- |
password |
配置密码。 |
- |
tableName |
对应dws表。 |
- |
参数 |
说明 |
默认值 |
---|---|---|
fetchSize |
jdbc statement中fetchSize参数,用于控制查询数据库返回条数。 |
1000 |
enablePushDown |
开启条件下推:开启后limit和where条件会下推到数据库执行。 |
true |
示例
该示例是从GaussDB(DWS)数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下:
- 在GaussDB(DWS)中创建相应的表,表名为dws_order,SQL语句参考如下:
1 2 3 4 5 6 7 8 9 10
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
- 在GaussDB(DWS)中执行以下SQL语句,向dws_order表中插入数据:
1 2 3 4 5 6 7 8 9 10 11 12
insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- 执行Flink SQL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource;
- 执行结果如下: