创建DWS维表
创建DWS表用于与输入流连接。
前提条件
- 请务必确保您的账户下已创建了所需的DWS实例。
语法格式
1 2 3 4 5 6 7 8 9 10 11 | create table dwsSource (
attr_name attr_type
(',' attr_name attr_type)*
)
with (
'connector.type' = 'gaussdb',
'connector.url' = '',
'connector.table' = '',
'connector.username' = '',
'connector.password' = ''
);
|
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
connector类型,需配置为'gaussdb' |
connector.url |
是 |
jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 |
connector.table |
是 |
读取数据库中的数据所在的表名 |
connector.driver |
否 |
jdbc连接驱动,默认为: org.postgresql.Driver。 |
connector.username |
否 |
数据库认证用户名,需要和'connector.password'一起配置 |
connector.password |
否 |
数据库认证密码,需要和'connector.username'一起配置 |
connector.read.partition.column |
否 |
用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.lower-bound |
否 |
第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.upper-bound |
否 |
最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.num |
否 |
分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 |
connector.read.fetch-size |
否 |
每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 |
connector.lookup.cache.max-rows |
否 |
维表配置,缓存的最大行数,超过该值时,老的数据会被踢除。-1表示不使用缓存。 |
connector.lookup.cache.ttl |
否 |
维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 |
connector.lookup.max-retries |
否 |
维表配置,数据拉取最大重试次数,默认为3。 |
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | CREATE TABLE car_infos (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT,
proctime as PROCTIME()
)
WITH (
'connector.type' = 'dis',
'connector.region' = 'cn-north-1',
'connector.channel' = 'disInput',
'format.type' = 'csv'
);
CREATE TABLE db_info (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT
)
WITH (
'connector.type' = 'gaussdb',
'connector.driver' = 'com.huawei.gauss200.jdbc.Driver',
'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx',
'connector.table' = 'car_info',
'connector.username' = 'xx',
'connector.password' = 'xx',
'connector.lookup.cache.max-rows' = '10000',
'connector.lookup.cache.ttl' = '24h'
);
CREATE TABLE audi_cheaper_than_30w (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT
)
WITH (
'connector.type' = 'dis',
'connector.region' = 'cn-north-1',
'connector.channel' = 'disOutput',
'connector.partition-key' = 'car_id,car_owner',
'format.type' = 'csv'
);
INSERT INTO audi_cheaper_than_30w
SELECT a.car_id, b.car_owner, b.car_brand, b.car_price
FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
|
