JDBC结果表
功能描述
DLI通过JDBC结果表将Flink作业的输出数据输出到关系型数据库中。
前提条件
- DLI要与实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 如果JDBC结果表定义了主键,则连接器以upsert模式运行,否则,连接器以Append模式运行。
- upsert模式:Flink会根据主键插入新行或更新现有行,Flink可以通过这种方式保证幂等性。为保证输出结果符合预期,建议为表定义主键。
- Append模式:Flink 会将所有记录解释为INSERT消息,如果底层数据库发生主键或唯一约束违规,INSERT操作可能会失败。
语法格式
1 2 3 4 5 6 7 8 9 10 11 12 13 | create table jdbcSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' ); |
参数说明
参数 | 是否必选 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
connector | 是 | 无 | String | 指定要使用的连接器,这里应该是'jdbc'。 |
url | 是 | 无 | String | 数据库的URL。 |
table-name | 是 | 无 | String | 读取数据库中的数据所在的表名。 |
driver | 否 | 无 | String | 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 |
username | 否 | 无 | String | 数据库认证用户名,需要和'password'一起配置。 |
password | 否 | 无 | String | 数据库认证密码,需要和'username'一起配置。 |
sink.buffer-flush.max-rows | 否 | 100 | Integer | 每次写入请求缓存的最大行数。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 刷新缓存的间隔,在这段时间内以异步线程刷新数据。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 注意:"sink.buffer-flush.max-rows" 设置为 "0",并设置刷新缓存间隔,则以完整的异步处理方式刷新缓存。 格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 |
sink.max-retries | 否 | 3 | Integer | 将记录写入数据库失败时的最大重试次数。 |
数据类型映射
MySQL类型 | PostgreSQL类型 | Flink SQL类型 |
|---|---|---|
TINYINT | - | TINYINT |
SMALLINT TINYINT UNSIGNED | SMALLINT INT2 SMALLSERIAL SERIAL2 | SMALLINT |
INT MEDIUMINT SMALLINT UNSIGNED | INTEGER SERIAL | INT |
BIGINT INT UNSIGNED | BIGINT BIGSERIAL | BIGINT |
BIGINT UNSIGNED | - | DECIMAL(20, 0) |
BIGINT | BIGINT | BIGINT |
FLOAT | REAL FLOAT4 | FLOAT |
DOUBLE DOUBLE PRECISION | FLOAT8 DOUBLE PRECISION | DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) | NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN TINYINT(1) | BOOLEAN | BOOLEAN |
DATE | DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT | CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT | STRING |
BINARY VARBINARY BLOB | BYTEA | BYTES |
- | ARRAY | ARRAY |
示例
使用Kafka发送数据,通过JDBC结果表将Kafka数据再输出到MySQL数据库中。
- 在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
- 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 登录MySQL,并使用下述命令在flink库下创建orders表。
CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE UNSIGNED NOT NULL, `real_pay` DOUBLE UNSIGNED NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
- 创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE jdbcSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select * from kafkaSource;
- 连接Kafka集群,向Kafka相应的topic中发送如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} - 查看表中数据,在MySQL中执行sql查询语句。
select * from orders;
其结果参考如下(注意,以下数据为从MySQL中复制的结果,并不是MySQL中的数据样式)。202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106 202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106
常见问题
无

