更新时间:2024-07-27 GMT+08:00

JDBC结果表

功能描述

DLI通过JDBC结果表将Flink作业的输出数据输出到关系型数据库中。

前提条件

  • DLI要与实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 如果JDBC结果表定义了主键,则连接器以upsert模式运行,否则,连接器以Append模式运行。
    • upsert模式:Flink会根据主键插入新行或更新现有行,Flink可以通过这种方式保证幂等性。为保证输出结果符合预期,建议为表定义主键。
    • Append模式:Flink 会将所有记录解释为INSERT消息,如果底层数据库发生主键或唯一约束违规,INSERT操作可能会失败。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table jdbcSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'driver' = '',
  'username' = '',
  'password' = ''
);

参数说明

参数

是否必选

默认值

类型

说明

connector

String

指定要使用的连接器,这里应该是'jdbc'。

url

String

数据库的URL。

table-name

String

读取数据库中的数据所在的表名。

driver

String

连接数据库所需要的驱动。若未配置,则会自动通过URL提取。

username

String

数据库认证用户名,需要和'password'一起配置。

password

String

数据库认证密码,需要和'username'一起配置。

sink.buffer-flush.max-rows

100

Integer

每次写入请求缓存的最大行数。

它能提升写入数据的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

sink.buffer-flush.interval

1s

Duration

刷新缓存的间隔,在这段时间内以异步线程刷新数据。

它能提升写入数据的性能,但是也可能增加延迟。

设置为 "0" 关闭此选项。

注意:"sink.buffer-flush.max-rows" 设置为 "0",并设置刷新缓存间隔,则以完整的异步处理方式刷新缓存。

格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

sink.max-retries

3

Integer

将记录写入数据库失败时的最大重试次数。

pwd_auth_name

String

DLI侧创建的Password类型的跨源认证名称。

使用跨源认证则无需在作业中配置账号和密码。

数据类型映射

表1 数据类型映射

MySQL类型

PostgreSQL类型

Flink SQL类型

TINYINT

-

TINYINT

SMALLINT

TINYINT UNSIGNED

SMALLINT

INT2

SMALLSERIAL

SERIAL2

SMALLINT

INT

MEDIUMINT

SMALLINT UNSIGNED

INTEGER

SERIAL

INT

BIGINT

INT UNSIGNED

BIGINT

BIGSERIAL

BIGINT

BIGINT UNSIGNED

-

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

REAL

FLOAT4

FLOAT

DOUBLE

DOUBLE PRECISION

FLOAT8

DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)

DECIMAL(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

TINYINT(1)

BOOLEAN

BOOLEAN

DATE

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

VARCHAR(n)

TEXT

CHAR(n)

CHARACTER(n)

VARCHAR(n)

CHARACTER

VARYING(n)

TEXT

STRING

BINARY

VARBINARY

BLOB

BYTEA

BYTES

-

ARRAY

ARRAY

示例

使用Kafka发送数据,通过JDBC结果表将Kafka数据再输出到MySQL数据库中。

  1. 参考增强型跨源连接,在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 登录MySQL,并使用下述命令在flink库下创建orders表。
    CREATE TABLE `flink`.`orders` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE UNSIGNED NOT NULL,
    	`real_pay` DOUBLE UNSIGNED NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;
  4. 创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE jdbcSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名
      'table-name' = 'orders',
      'username' = 'MySQLUsername',
      'password' = 'MySQLPassword',
      'sink.buffer-flush.max-rows' = '1'
    );
    
    insert into jdbcSink select * from kafkaSource;
  5. 连接Kafka集群,向Kafka相应的topic中发送如下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. 查看表中数据,在MySQL中执行sql查询语句。
    select * from orders;
    其结果参考如下(注意,以下数据为从MySQL中复制的结果,并不是MySQL中的数据样式)。
    202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106
    202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106

常见问题