Updated on 2024-07-19 GMT+08:00

Source Table

Syntax

When used as the source table, DWS-Connector can push down limit and where conditions to the database for execution by implementing the SupportsLimitPushDown and SupportsFilterPushDown interfaces.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

Parameter Description

Table 1 Database configurations

Parameter

Description

Default Value

connector

The Flink framework differentiates connector parameters. This parameter is fixed to dws.

-

url

Database connection address

-

username

Configured connection user

-

password

Configured password

-

tableName

GaussDB(DWS) table

-

Table 2 Query parameters

Parameter

Description

Default Value

fetchSize

The fetchSize parameter in the JDBC statement is used to control the number of records returned by the database.

1000

enablePushDown

Enabling condition pushdown will result in the limit and where conditions being pushed down to the database for execution.

true

Examples

In this example, data is read from the GaussDB(DWS) data source and written to the Print result table. The procedure is as follows:

  1. Create a table named dws_order in GaussDB(DWS).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    create table public.dws_order(
    order_id VARCHAR,
    order_channel VARCHAR,
    order_time VARCHAR,
    pay_amount FLOAT8,
    real_pay FLOAT8,
    pay_time VARCHAR,
    user_id VARCHAR,
    user_name VARCHAR,
    area_id VARCHAR);
    
  2. Insert data into the dws_order table.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    insert into public.dws_order
      (order_id,
      order_channel,
      order_time,
      pay_amount,
      real_pay,
      pay_time,
      user_id,
      user_name,
      area_id) values
      ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'),
      ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
    
  3. Execute the SQL.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    CREATE TABLE dwsSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName',
      'tableName' = 'dws_order',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword'
    );
    CREATE TABLE printSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from dwsSource;
    
  4. The command output is shown in the following figure.