创建JDBC维表
创建JDBC表用于与输入流连接。
前提条件
- 请务必确保您的账户下已创建了相应实例。
语法格式
1 2 3 4 5 6 7 8 9 10 11 | CREATE TABLE table_id (
attr_name attr_type
(',' attr_name attr_type)*
)
WITH (
'connector.type' = 'jdbc',
'connector.url' = '',
'connector.table' = '',
'connector.username' = '',
'connector.password' = ''
);
|
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc |
connector.url |
是 |
数据库的URL |
connector.table |
是 |
读取数据库中的数据所在的表名 |
connector.driver |
否 |
连接数据库所需要的驱动。若未配置,则会自动通过URL提取 |
connector.username |
否 |
数据库认证用户名,需要和'connector.password'一起配置 |
connector.password |
否 |
数据库认证密码,需要和'connector.username'一起配置 |
connector.read.partition.column |
否 |
用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.lower-bound |
否 |
第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.upper-bound |
否 |
最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 |
connector.read.partition.num |
否 |
分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 |
connector.read.fetch-size |
否 |
每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 |
connector.lookup.cache.max-rows |
否 |
维表配置,缓存的最大行数,超过该值时,老的数据会被踢除。-1表示不使用缓存。 |
connector.lookup.cache.ttl |
否 |
维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 |
connector.lookup.max-retries |
否 |
维表配置,数据拉取最大重试次数,默认为3。 |
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | CREATE TABLE car_infos (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT,
proctime as PROCTIME()
)
WITH (
'connector.type' = 'dis',
'connector.region' = 'cn-north-1',
'connector.channel' = 'disInput',
'format.type' = 'csv'
);
CREATE TABLE db_info (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT
)
WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx',
'connector.table' = 'jdbc_table_name',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'xxx',
'connector.password' = 'xxxxx'
);
CREATE TABLE audi_cheaper_than_30w (
car_id STRING,
car_owner STRING,
car_brand STRING,
car_price INT
)
WITH (
'connector.type' = 'dis',
'connector.region' = 'cn-north-1',
'connector.channel' = 'disOutput',
'connector.partition-key' = 'car_id,car_owner',
'format.type' = 'csv'
);
INSERT INTO audi_cheaper_than_30w
SELECT a.car_id, b.car_owner, b.car_brand, b.car_price
FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
|
