JDBC维表
创建JDBC表用于与输入流连接。
前提条件
请务必确保您的账户下已创建了相应实例。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE table_id ( attr_name attr_type (',' attr_name attr_type)* ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector |
是 |
数据源类型,固定为:jdbc。 |
url |
是 |
数据库的URL。 |
table-name |
是 |
读取数据库中的数据所在的表名。 |
driver |
否 |
连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 |
username |
否 |
数据库认证用户名,需要和'password'一起配置。 |
password |
否 |
数据库认证密码,需要和'username'一起配置。 |
scan.partition.column |
否 |
用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 |
scan.partition.lower-bound |
否 |
第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在 |
scan.partition.upper-bound |
否 |
最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在 |
scan.partition.num |
否 |
分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 |
scan.fetch-size |
否 |
每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 |
lookup.cache.max-rows |
否 |
维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 |
lookup.cache.ttl |
否 |
维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 |
lookup.max-retries |
否 |
维表配置,数据拉取最大重试次数,默认为3。 |
pwd_auth_name |
否 |
DLI侧创建的Password类型的跨源认证名称。 |
数据类型映射
MySQL类型 |
PostgreSQL类型 |
Flink SQL类型 |
---|---|---|
TINYINT |
- |
TINYINT |
SMALLINT TINYINT UNSIGNED |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT |
INT MEDIUMINT SMALLINT UNSIGNED |
INTEGER SERIAL |
INT |
BIGINT INT UNSIGNED |
BIGINT BIGSERIAL |
BIGINT |
BIGINT UNSIGNED |
- |
DECIMAL(20, 0) |
BIGINT |
BIGINT |
BIGINT |
FLOAT |
REAL FLOAT4 |
FLOAT |
DOUBLE DOUBLE PRECISION |
FLOAT8 DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN TINYINT(1) |
BOOLEAN |
BOOLEAN |
DATE |
DATE |
DATE |
TIME [(p)] |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
STRING |
BINARY VARBINARY BLOB |
BYTEA |
BYTES |
- |
ARRAY |
ARRAY |
示例
从Kafka源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Kafka结果表中,其具体步骤如下:
- 参考增强型跨源连接,在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
- 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 连接MySQL数据库实例,在flink数据库中创建相应的表,作为维表,表名为area_info,SQL语句如下:
CREATE TABLE `flink`.`area_info` ( `area_id` VARCHAR(32) NOT NULL, `area_province_name` VARCHAR(32) NOT NULL, `area_city_name` VARCHAR(32) NOT NULL, `area_county_name` VARCHAR(32) NOT NULL, `area_street_name` VARCHAR(32) NOT NULL, `region_name` VARCHAR(32) NOT NULL, PRIMARY KEY (`area_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
- 连接MySQL数据库实例,向JDBC维表area_info中插入测试数据,其语句如下:
insert into flink.area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka为数据源,JDBC作为维表,数据写入到Kafka结果表。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'jdbc-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--其中url中的flink表示MySQL中area_info表所在的数据库名 'table-name' = 'area_info', 'username' = 'JDBCUserName', 'password' = 'JDBCPassWord' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
- 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
- 连接Kafka集群,在Kafka的sink topic读取数据,结果参考如下:
{"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
常见问题
无。