Source Table
Syntax
When used as the source table, DWS-Connector can push down limit and where conditions to the database for execution by implementing the SupportsLimitPushDown and SupportsFilterPushDown interfaces.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' ); |
Parameter Description
Parameter |
Description |
Default Value |
---|---|---|
connector |
The Flink framework differentiates connector parameters. This parameter is fixed to dws. |
- |
url |
Database connection address |
- |
username |
Configured connection user |
- |
password |
Configured password |
- |
tableName |
GaussDB(DWS) table |
- |
Parameter |
Description |
Default Value |
---|---|---|
fetchSize |
The fetchSize parameter in the JDBC statement is used to control the number of records returned by the database. |
1000 |
enablePushDown |
Enabling condition pushdown will result in the limit and where conditions being pushed down to the database for execution. |
true |
Examples
In this example, data is read from the GaussDB(DWS) data source and written to the Print result table. The procedure is as follows:
- Create a table named dws_order in GaussDB(DWS).
1 2 3 4 5 6 7 8 9 10
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
- Insert data into the dws_order table.
1 2 3 4 5 6 7 8 9 10 11 12
insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- Execute the SQL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource;
- The command output is shown in the following figure.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot