更新时间:2024-06-29 GMT+08:00

做源表

语法格式

用DWS-Connector做源表时,DWS-Connector实现了SupportsLimitPushDown和SupportsFilterPushDown接口,支持将limit和where条件下推到数据库执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
)
with (
  'connector' = 'dws',
  'url' = '',
  'tableName' = '',
  'username' = '',
  'password' = ''
);

参数说明

表1 数据库配置

参数

说明

默认值

connector

flink框架区分connector参数,固定为dws。

-

url

数据库连接地址。

-

username

配置连接用户。

-

password

配置密码。

-

tableName

对应dws表。

-

表2 查询参数

参数

说明

默认值

fetchSize

jdbc statement中fetchSize参数,用于控制查询数据库返回条数。

1000

enablePushDown

开启条件下推:开启后limit和where条件会下推到数据库执行。

true

示例

该示例是从GaussDB(DWS)数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下:

  1. 在GaussDB(DWS)中创建相应的表,表名为dws_order,SQL语句参考如下:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    create table public.dws_order(
    order_id VARCHAR,
    order_channel VARCHAR,
    order_time VARCHAR,
    pay_amount FLOAT8,
    real_pay FLOAT8,
    pay_time VARCHAR,
    user_id VARCHAR,
    user_name VARCHAR,
    area_id VARCHAR);
    
  2. 在GaussDB(DWS)中执行以下SQL语句,向dws_order表中插入数据:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    insert into public.dws_order
      (order_id,
      order_channel,
      order_time,
      pay_amount,
      real_pay,
      pay_time,
      user_id,
      user_name,
      area_id) values
      ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'),
      ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
    
  3. 执行Flink SQL:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    CREATE TABLE dwsSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName',
      'tableName' = 'dws_order',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword'
    );
    CREATE TABLE printSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'print'
    );
    insert into printSink select * from dwsSource;
    
  4. 执行结果如下: