JDBC源表
功能描述
JDBC连接器是Flink内置的Connector,用于从数据库读取相应的数据。
前提条件
- 要与实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
    跨源认证简介及操作方法请参考跨源认证简介。 
注意事项
创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
语法格式
create table jbdcSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
) with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'username' = '',
  'password' = ''
);
 参数说明
| 参数 | 是否必选 | 默认值 | 类型 | 说明 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 指定要使用的连接器,当前固定为'jdbc'。 | 
| url | 是 | 无 | String | 数据库的URL。 | 
| table-name | 是 | 无 | String | 读取数据库中的数据所在的表名。 | 
| driver | 否 | 无 | String | 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 | 
| username | 否 | 无 | String | 数据库认证用户名,需要和'password'一起配置。 | 
| password | 否 | 无 | String | 数据库认证密码,需要和'username'一起配置。 | 
| scan.partition.column | 否 | 无 | String | 用于对输入进行分区的列名。分区扫描参数,具体请参考分区扫描功能介绍。 | 
| scan.partition.num | 否 | 无 | Integer | 分区的个数。分区扫描参数,具体请参考分区扫描功能介绍。 | 
| scan.partition.lower-bound | 否 | 无 | Integer | 第一个分区的最小值。分区扫描参数,具体请参考分区扫描功能介绍。 | 
| scan.partition.upper-bound | 否 | 无 | Integer | 最后一个分区的最大值。分区扫描参数,具体请参考分区扫描功能介绍。 | 
| scan.fetch-size | 否 | 0 | Integer | 每次从数据库拉取数据的行数。若指定为0,则会忽略sql hint。 | 
| scan.auto-commit | 否 | true | Boolean | 是否设置自动提交,以确定事务中的每个statement是否自动提交 | 
| pwd_auth_name | 否 | 无 | String | DLI侧创建的Password类型的跨源认证名称。用户若配置该配置项则不用在SQL中配置账号和密码。 | 
分区扫描功能介绍
为了加速Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。以下参数定义了从多个任务并行读取时如何对表进行分区。
- scan.partition.column:用于对输入进行分区的列名,该列的数据类型必须是数字,日期或时间戳。
- scan.partition.num: 分区数。
- scan.partition.lower-bound:第一个分区的最小值。
- scan.partition.upper-bound:最后一个分区的最大值。
 
 
   - 建表时以上扫描分区参数必须同时存在或者同时不存在。
- scan.partition.lower-bound和scan.partition.upper-bound参数仅用于决定分区步长,而不是用于过滤表中的行,表中的所有行都会被分区并返回。
数据类型映射
| MySQL类型 | PostgreSQL类型 | Flink SQL类型 | 
|---|---|---|
| TINYINT | - | TINYINT | 
| SMALLINT TINYINT UNSIGNED | SMALLINT INT2 SMALLSERIAL SERIAL2 | SMALLINT | 
| INT MEDIUMINT SMALLINT UNSIGNED | INTEGER SERIAL | INT | 
| BIGINT INT UNSIGNED | BIGINT BIGSERIAL | BIGINT | 
| BIGINT UNSIGNED | - | DECIMAL(20, 0) | 
| BIGINT | BIGINT | BIGINT | 
| FLOAT | REAL FLOAT4 | FLOAT | 
| DOUBLE DOUBLE PRECISION | FLOAT8 DOUBLE PRECISION | DOUBLE | 
| NUMERIC(p, s) DECIMAL(p, s) | NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) | 
| BOOLEAN TINYINT(1) | BOOLEAN | BOOLEAN | 
| DATE | DATE | DATE | 
| TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] | 
| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | 
| CHAR(n) VARCHAR(n) TEXT | CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT | STRING | 
| BINARY VARBINARY BLOB | BYTEA | BYTES | 
| - | ARRAY | ARRAY | 
示例
使用JDBC作为数据源,Print作为sink,从RDS MySQL数据库中读取数据,并写入到Print中。
- 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。
    在flink数据库库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE UNSIGNED NOT NULL, `real_pay` DOUBLE UNSIGNED NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 插入表数据:insert into orders( order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110');
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。CREATE TABLE jdbcSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--flink为RDS MySQL创建的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from jdbcSource; 
- 按照如下方式查看taskmanager.out文件中的数据结果:
    - 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
 数据结果参考如下: +I(202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106) +I(202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110) 
常见问题
无
 
  