创建DWS维表
创建DWS表用于与输入流连接。
前提条件
- 请务必确保您的账户下已创建了所需的DWS实例。
语法格式
1 2 3 4 5 6 7 8 9 10 11 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' ); |
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
connector类型,需配置为'gaussdb' |
connector.url |
是 |
jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 |
connector.table |
是 |
读取数据库中的数据所在的表名 |
connector.driver |
否 |
jdbc连接驱动,默认为: org.postgresql.Driver。 |
connector.username |
否 |
数据库认证用户名,需要和'connector.password'一起配置 |
connector.password |
否 |
数据库认证密码,需要和'connector.username'一起配置 |
connector.read.partition.column |
否 |
用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.lower-bound |
否 |
第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.upper-bound |
否 |
最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.num |
否 |
分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 |
connector.read.fetch-size |
否 |
每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 |
connector.lookup.cache.max-rows |
否 |
维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 |
connector.lookup.cache.ttl |
否 |
维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 |
connector.lookup.max-retries |
否 |
维表配置,数据拉取最大重试次数,默认为3。 |
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'gaussdb', 'connector.driver' = 'org.postgresql.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.lookup.cache.max-rows' = '10000', 'connector.lookup.cache.ttl' = '24h' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id; |