更新时间:2024-11-29 GMT+08:00

FlinkServer对接JDBC

操作场景

FlinkServer支持对接JDBC。本示例以安全模式FlinkServer、Kafka为例,介绍JDBC的MySQL作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,指导如何在FlinkServer作业管理页面对接JDBC。

Flink SQL与JDBC数据类型对应关系

Flink SQL数据类型

MySQL数据类型

Oracle数据类型

PostgreSQL数据类型

SQL Server数据类型

BOOLEAN

BOOLEAN

TINYINT(1)

-

BOOLEAN

BIT

TINYINT

TINYINT

-

-

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

-

SMALLINT

INT2

SMALLSERIAL

SERIAL2

SMALLINT

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

-

INTEGER

SERIAL

INT

BIGINT

BIGINT

INT UNSIGNED

-

BIGINT

BIGSERIAL

BIGINT

FLOAT

FLOAT

BINARY_FLOAT

REAL

FLOAT4

REAL

DOUBLE

DOUBLE

DOUBLE PRECISION

BINARY_DOUBLE

FLOAT8

DOUBLE PRECISION

FLOAT

STRING

CHAR(n)

VARCHAR(n)

TEXT

CHAR(n)

VARCHAR(n)

CLOB

CHAR(n)

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

CHAR(n)

NCHAR(n)

VARCHAR(n)

NVARCHAR(n)

TEXT

NTEXT

BYTES

BINARY

VARBINARY

BLOB

RAW(s)

BLOB

BYTEA

BINARY(n)

VARBINARY(n)

ARRAY

-

-

ARRAY

-

DATE

DATE

DATE

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME(0)

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DATETIME

DATETIME2

DECIMAL(20, 0)

BIGINT UNSIGNED

-

-

-

DECIMAL(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

SMALLINT

FLOAT(s)

DOUBLE PRECISION

REAL

NUMBER(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

前提条件

FlinkServer所在集群:
  • 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink等服务。
  • 包含Kafka服务的客户端已安装,安装路径如:/opt/client
  • 创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser

JDBC作为Sink表(以MySQL为例)

  1. 在对应数据库如MySQL中创建用于接受数据的空表,如表“customer_t1”。
  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    CREATE TABLE MyUserTable(
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH(
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql',
      'table-name' = 'customer_t1',
      'username' = 'MySQL数据库用户名',
      'password' = 'MySQL数据库用户名的密码'
    );
    CREATE TABLE KafkaSource (
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_source',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    Insert into
      MyUserTable
    select
      *
    from
      KafkaSource;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  4. 查看作业管理界面,作业状态为“运行中”。
  5. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据。

    ./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为customer_source:

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:

    3,zhangsan
    4,wangwu
    8,zhaosi

    输入完成后按回车发送消息。

  6. 登录MySQL客户端执行以下命令查看Sink表中是否接收到数据。

    Select * from customer_t1;

JDBC作为Source表(以MySQL为例)

  1. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。

    CREATE TABLE MyUserTable(
      --MySQL作为source表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH(
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql',
      'table-name' = 'customer_t1',
      'username' = 'MySQL数据库用户名',
      'password' = 'MySQL数据库用户名的密码'
    );
    CREATE TABLE KafkaSink (
      -- Kafka作为sink表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_sink',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    Insert into
      KafkaSink
    select
      *
    from
      MyUserTable;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties

JDBC作为维表(以MySQL为例)

kafkaSource作为事实表,“customer_t2”作为维度表,结果写入kafkaSink。

  1. 在MySQL客户端创建维度表“customer_t2”,建表语句示例如下:

    CREATE TABLE customer_t2(
    c_customer_sk INTEGER PRIMARY KEY,
    c_customer_age INTEGER,
    c_customer_address VARCHAR(32)
    );
     
    INSERT INTO customer_t2 VALUES(1,18,'city a');
    INSERT INTO customer_t2 VALUES(2,14,'city b');
    INSERT INTO customer_t2 VALUES(3,16,'city c');
    INSERT INTO customer_t2 VALUES(4,24,'city d');
    INSERT INTO customer_t2 VALUES(5,32,'city e');
    INSERT INTO customer_t2 VALUES(6,27,'city f');
    INSERT INTO customer_t2 VALUES(7,41,'city a');
    INSERT INTO customer_t2 VALUES(8,35,'city h');
    INSERT INTO customer_t2 VALUES(9,16,'city j');

  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。

    CREATE TABLE KafkaSource (
      -- Kafka作为source表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32),
      proctime as proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_source',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    CREATE TABLE KafkaSink (
      -- Kafka作为sink表
      c_customer_sk INTEGER,
      c_customer_name VARCHAR(32),
      c_customer_age INTEGER,
      c_customer_address VARCHAR(32)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'customer_sink',
      'properties.bootstrap.servers' = 'KafkaBroker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'  --FlinkServer所在集群为非安全模式去掉此参数
    );
    CREATE TABLE MyUserTable (
      -- MySQL作为维表
      c_customer_sk INTEGER PRIMARY KEY NOT ENFORCED,
      c_customer_age INTEGER,
      c_customer_address VARCHAR(32)
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql',
      'table-name' = 'customer_t2',
      'username' = 'MySQL数据库用户名',
      'password' = 'MySQL数据库用户名的密码'
    );
    INSERT INTO
      KafkaSink
    SELECT
      t.c_customer_sk,
      t.c_customer_name,
      d.c_customer_age,
      d.c_customer_address
    FROM
      KafkaSource as t
      JOIN MyUserTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.c_customer_sk = d.c_customer_sk;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
    • properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。

  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties

  5. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。

    ./kafka-topics.sh --list Kafka的Broker实例业务IP:Kafka端口号--command-config 客户端目录/Kafka/kafka/config/client.properties

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为customer_source:

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:

    3,zhangsan
    5,zhaosi
    1,xiaoming
    2,liuyang
    7,liubei
    10,guanyu
    20,zhaoyun

    输入完成后按回车发送消息,4中的kafka-console-consumer窗口打印结果如下:

    3,zhangsan,16,city c
    5,zhaosi,32,city e
    1,xiaoming,18,city a
    2,liuyang,14,city b
    7,liubei,41,city a