创建JDBC维表
创建JDBC表用于与输入流连接。
前提条件
- 请务必确保您的账户下已创建了相应实例。
语法格式
| 1 2 3 4 5 6 7 8 9 10 11 | CREATE TABLE table_id ( attr_name attr_type (',' attr_name attr_type)* ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' ); | 
参数说明
| 参数 | 是否必选 | 说明 | 
|---|---|---|
| connector.type | 是 | 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc | 
| connector.url | 是 | 数据库的URL | 
| connector.table | 是 | 读取数据库中的数据所在的表名 | 
| connector.driver | 否 | 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 | 
| connector.username | 否 | 数据库认证用户名,需要和'connector.password'一起配置 | 
| connector.password | 否 | 数据库认证密码,需要和'connector.username'一起配置 | 
| connector.read.partition.column | 否 | 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.lower-bound | 否 | 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.upper-bound | 否 | 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 | 
| connector.read.partition.num | 否 | 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 | 
| connector.read.fetch-size | 否 | 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 | 
| connector.lookup.cache.max-rows | 否 | 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 | 
| connector.lookup.cache.ttl | 否 | 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 | 
| connector.lookup.max-retries | 否 | 维表配置,数据拉取最大重试次数,默认为3。 | 
示例
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'ap-southeast-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxx' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'ap-southeast-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id; | 
 
    