更新时间:2024-07-27 GMT+08:00

JDBC维表

创建JDBC表用于与输入流连接。

前提条件

请务必确保您的账户下已创建了相应实例。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE TABLE  table_id (
  attr_name attr_type 
  (',' attr_name attr_type)* 
)
  WITH (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'driver' = '',
  'username' = '',
  'password' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector

数据源类型,固定为:jdbc。

url

数据库的URL。

table-name

读取数据库中的数据所在的表名。

driver

连接数据库所需要的驱动。若未配置,则会自动通过URL提取。

username

数据库认证用户名,需要和'password'一起配置。

password

数据库认证密码,需要和'username'一起配置。

scan.partition.column

用于对输入进行分区的列名。

与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。

scan.partition.lower-bound

第一个分区的最小值。

与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在

scan.partition.upper-bound

最后一个分区的最大值。

与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在

scan.partition.num

分区的个数。

与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。

scan.fetch-size

每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。

lookup.cache.max-rows

维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。

lookup.cache.ttl

维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。

lookup.max-retries

维表配置,数据拉取最大重试次数,默认为3。

pwd_auth_name

DLI侧创建的Password类型的跨源认证名称。

数据类型映射

表2 数据类型映射

MySQL类型

PostgreSQL类型

Flink SQL类型

TINYINT

-

TINYINT

SMALLINT

TINYINT UNSIGNED

SMALLINT

INT2

SMALLSERIAL

SERIAL2

SMALLINT

INT

MEDIUMINT

SMALLINT UNSIGNED

INTEGER

SERIAL

INT

BIGINT

INT UNSIGNED

BIGINT

BIGSERIAL

BIGINT

BIGINT UNSIGNED

-

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

REAL

FLOAT4

FLOAT

DOUBLE

DOUBLE PRECISION

FLOAT8

DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)

DECIMAL(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

TINYINT(1)

BOOLEAN

BOOLEAN

DATE

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

VARCHAR(n)

TEXT

CHAR(n)

CHARACTER(n)

VARCHAR(n)

CHARACTER

VARYING(n)

TEXT

STRING

BINARY

VARBINARY

BLOB

BYTEA

BYTES

-

ARRAY

ARRAY

示例

从Kafka源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Kafka结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 连接MySQL数据库实例,在flink数据库中创建相应的表,作为维表,表名为area_info,SQL语句如下:
    CREATE TABLE `flink`.`area_info` (
    	`area_id` VARCHAR(32) NOT NULL,
    	`area_province_name` VARCHAR(32) NOT NULL,
    	`area_city_name` VARCHAR(32) NOT NULL,
    	`area_county_name` VARCHAR(32) NOT NULL,
    	`area_street_name` VARCHAR(32) NOT NULL,
    	`region_name` VARCHAR(32) NOT NULL,
    	PRIMARY KEY (`area_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;
  4. 连接MySQL数据库实例,向JDBC维表area_info中插入测试数据,其语句如下:
    insert into flink.area_info
      (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) 
      values
      ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'),
      ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'),
      ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'),  ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
  5. 创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka为数据源,JDBC作为维表,数据写入到Kafka结果表。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'jdbc-order',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    --创建地址维表
    create table area_info (
        area_id string, 
        area_province_name string,
        area_city_name string,
        area_county_name string, 
        area_street_name string, 
        region_name string 
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--其中url中的flink表示MySQL中area_info表所在的数据库名
      'table-name' = 'area_info',
      'username' = 'JDBCUserName',
      'password' = 'JDBCPassWord'
    );
    
    --根据地址维表生成详细的包含地址的订单信息宽表
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'kafka',
      'topic' = 'KafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders 
               left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
    
  6. 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
  7. 连接Kafka集群,在Kafka的sink topic读取数据,结果参考如下:
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}
    
    {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
    
    {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}

常见问题

无。