Source Table
Syntax
When used as the source table, DWS-Connector can push down limit and where conditions to the database for execution by implementing the SupportsLimitPushDown and SupportsFilterPushDown interfaces.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
Parameter Description
Parameter |
Description |
Default Value |
---|---|---|
connector |
The Flink framework differentiates connector parameters. This parameter is fixed to dws. |
- |
url |
Database connection address |
- |
username |
Configured connection user |
- |
password |
Configured password |
- |
tableName |
GaussDB(DWS) table |
- |
Parameter |
Description |
Default Value |
---|---|---|
fetchSize |
The fetchSize parameter in the JDBC statement is used to control the number of records returned by the database. |
1000 |
enablePushDown |
Enabling condition pushdown will result in the limit and where conditions being pushed down to the database for execution. |
true |
Examples
In this example, data is read from the GaussDB(DWS) data source and written to the Print result table. The procedure is as follows:
- Create a table named dws_order in GaussDB(DWS).
1 2 3 4 5 6 7 8 9 10
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
- Insert data into the dws_order table.
1 2 3 4 5 6 7 8 9 10 11 12
insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- Execute the SQL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource;
- The command output is shown in the following figure.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.