更新时间:2024-09-27 GMT+08:00

JDBC结果表

功能描述

DLI将Flink作业的输出数据输出到关系型数据库中。

前提条件

  • 要与实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
  • 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table jdbcSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector.type' = 'jdbc',
  'connector.url' = '',
  'connector.table' = '',
  'connector.driver' = '',
  'connector.username' = '',
  'connector.password' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc

connector.url

数据库的URL

connector.table

读取数据库中的数据所在的表名

connector.driver

连接数据库所需要的驱动。若未配置,则会自动通过URL提取

connector.username

访问数据库所需要的账号

connector.password

访问数据库所需要的密码

connector.write.flush.max-rows

写数据时,刷新数据的最大行数。默认值为5000

connector.write.flush.interval

刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。不填写则默认不根据时间刷新

connector.write.max-retries

写数据失败时的最大尝试次数。默认值为3

connector.write.exclude-update-columns

默认值为空(默认忽略primary key字段),表示更新主键值相同的数据时,忽略指定字段的更新

注意事项

示例

将流jdbcSink的数据输出到MySQL数据库中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
create table jdbcSink(
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_speed INT
)
with (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx',
  'connector.table' = 'jdbc_table_name',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'xxx',
  'connector.password' = 'xxxxxx'
);