更新时间:2024-09-27 GMT+08:00

创建DWS维表

创建DWS表用于与输入流连接。

前提条件

  • 请务必确保您的账户下已创建了所需的DWS实例。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
with (
  'connector.type' = 'gaussdb',
  'connector.url' = '',
  'connector.table' = '',
  'connector.username' = '',
  'connector.password' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector类型,需配置为'gaussdb'

connector.url

jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。

connector.table

读取数据库中的数据所在的表名

connector.driver

jdbc连接驱动,默认为: org.postgresql.Driver。

connector.username

数据库认证用户名,需要和'connector.password'一起配置

connector.password

数据库认证密码,需要和'connector.username'一起配置

connector.read.partition.column

用于对输入进行分区的列名

与connector.read.partition.lower-bound、connector.read.partition.upper-bound、

connector.read.partition.num必须同时存在或者同时不存在

connector.read.partition.lower-bound

第一个分区的最小值

与connector.read.partition.column、connector.read.partition.upper-bound、

connector.read.partition.num必须同时存在或者同时不存在

connector.read.partition.upper-bound

最后一个分区的最大值

与connector.read.partition.column、connector.read.partition.lower-bound、

connector.read.partition.num必须同时存在或者同时不存在

connector.read.partition.num

分区的个数

与connector.read.partition.column、connector.read.partition.upper-bound、

connector.read.partition.upper-bound必须同时存在或者同时不存在

connector.read.fetch-size

每次从数据库拉取数据的行数。默认值为0,表示忽略该提示

connector.lookup.cache.max-rows

维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。

connector.lookup.cache.ttl

维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

connector.lookup.max-retries

维表配置,数据拉取最大重试次数,默认为3。

示例

RDS表用于与输入流连接。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
CREATE TABLE car_infos (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT,
  proctime as PROCTIME()
)
  WITH (
  'connector.type' = 'dis',
  'connector.region' = 'ap-southeast-1',
  'connector.channel' = 'disInput',
  'format.type' = 'csv'
  );

CREATE TABLE  db_info (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
  WITH (
  'connector.type' = 'gaussdb',
  'connector.driver' = 'org.postgresql.Driver',
  'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx',
  'connector.table' = 'car_info',
  'connector.username' = 'xx',
  'connector.password' = 'xx',
  'connector.lookup.cache.max-rows' = '10000',
  'connector.lookup.cache.ttl' = '24h'
);

CREATE TABLE audi_cheaper_than_30w (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT
)
  WITH (
  'connector.type' = 'dis',
  'connector.region' = 'ap-southeast-1',
  'connector.channel' = 'disOutput',
  'connector.partition-key' = 'car_id,car_owner',
  'format.type' = 'csv'
  );

INSERT INTO audi_cheaper_than_30w
SELECT a.car_id, b.car_owner, b.car_brand, b.car_price 
FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;