创建FlinkServer作业对接JDBC
本章节适用于MRS 3.3.1及之后的版本。
操作场景
FlinkServer支持对接JDBC。本示例以安全模式FlinkServer、Kafka为例,介绍JDBC的MySQL作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,指导如何在FlinkServer作业管理页面对接JDBC。
FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。
Flink SQL与JDBC数据类型对应关系
Flink SQL数据类型 |
MySQL数据类型 |
Oracle数据类型 |
PostgreSQL数据类型 |
SQL Server数据类型 |
---|---|---|---|---|
BOOLEAN |
BOOLEAN TINYINT(1) |
- |
BOOLEAN |
BIT |
TINYINT |
TINYINT |
- |
- |
TINYINT |
SMALLINT |
SMALLINT TINYINT UNSIGNED |
- |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT |
INT |
INT MEDIUMINT SMALLINT UNSIGNED |
- |
INTEGER SERIAL |
INT |
BIGINT |
BIGINT INT UNSIGNED |
- |
BIGINT BIGSERIAL |
BIGINT |
FLOAT |
FLOAT |
BINARY_FLOAT |
REAL FLOAT4 |
REAL |
DOUBLE |
DOUBLE DOUBLE PRECISION |
BINARY_DOUBLE |
FLOAT8 DOUBLE PRECISION |
FLOAT |
STRING |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) VARCHAR(n) CLOB |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
CHAR(n) NCHAR(n) VARCHAR(n) NVARCHAR(n) TEXT NTEXT |
BYTES |
BINARY VARBINARY BLOB |
RAW(s) BLOB |
BYTEA |
BINARY(n) VARBINARY(n) |
ARRAY |
- |
- |
ARRAY |
- |
DATE |
DATE |
DATE |
DATE |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME(0) |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME DATETIME2 |
DECIMAL(20, 0) |
BIGINT UNSIGNED |
- |
- |
- |
DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
SMALLINT FLOAT(s) DOUBLE PRECISION REAL NUMBER(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
前提条件
- 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink等服务。
- 包含Kafka服务的客户端已安装,安装路径如:/opt/client。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser。
JDBC作为Sink表(以MySQL为例)
- 在对应数据库如MySQL中创建用于接受数据的空表,如表“customer_t1”。
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE MyUserTable( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql', 'table-name' = 'customer_t1', 'username' = 'MySQL数据库用户名', 'password' = 'MySQL数据库用户名的密码' ); CREATE TABLE KafkaSource ( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); Insert into MyUserTable select * from KafkaSource;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。
- Kafka Broker实例IP地址及端口号说明:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据。
./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为customer_source:
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:
3,zhangsan 4,wangwu 8,zhaosi
输入完成后按回车发送消息。
- 登录MySQL客户端执行以下命令查看Sink表中是否接收到数据。
Select * from customer_t1;
JDBC作为Source表(以MySQL为例)
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。
CREATE TABLE MyUserTable( --MySQL作为source表 c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql', 'table-name' = 'customer_t1', 'username' = 'MySQL数据库用户名', 'password' = 'MySQL数据库用户名的密码' ); CREATE TABLE KafkaSink ( -- Kafka作为sink表 c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); Insert into KafkaSink select * from MyUserTable;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。
- Kafka Broker实例IP地址及端口号说明:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
JDBC作为维表(以MySQL为例)
kafkaSource作为事实表,“customer_t2”作为维度表,结果写入kafkaSink。
- 在MySQL客户端创建维度表“customer_t2”,建表语句示例如下:
CREATE TABLE customer_t2( c_customer_sk INTEGER PRIMARY KEY, c_customer_age INTEGER, c_customer_address VARCHAR(32) ); INSERT INTO customer_t2 VALUES(1,18,'city a'); INSERT INTO customer_t2 VALUES(2,14,'city b'); INSERT INTO customer_t2 VALUES(3,16,'city c'); INSERT INTO customer_t2 VALUES(4,24,'city d'); INSERT INTO customer_t2 VALUES(5,32,'city e'); INSERT INTO customer_t2 VALUES(6,27,'city f'); INSERT INTO customer_t2 VALUES(7,41,'city a'); INSERT INTO customer_t2 VALUES(8,35,'city h'); INSERT INTO customer_t2 VALUES(9,16,'city j');
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“5000”,“模式”可使用默认值。
CREATE TABLE KafkaSource ( -- Kafka作为source表 c_customer_sk INTEGER, c_customer_name VARCHAR(32), proctime as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); CREATE TABLE KafkaSink ( -- Kafka作为sink表 c_customer_sk INTEGER, c_customer_name VARCHAR(32), c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); CREATE TABLE MyUserTable ( -- MySQL作为维表 c_customer_sk INTEGER PRIMARY KEY NOT ENFORCED, c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL服务器IP:MySQL服务器端口/mysql', 'table-name' = 'customer_t2', 'username' = 'MySQL数据库用户名', 'password' = 'MySQL数据库用户名的密码' ); INSERT INTO KafkaSink SELECT t.c_customer_sk, t.c_customer_name, d.c_customer_age, d.c_customer_address FROM KafkaSource as t JOIN MyUserTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.c_customer_sk = d.c_customer_sk;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- properties.group.id:Kafka的使用者组ID,Kafka作为source时必选。
- properties.kerberos.domain.name:为“hadoop.系统域名”。可登录FusionInsight Manager界面,选择“系统 > 域和互信”中查看集群实际域名。
- Kafka Broker实例IP地址及端口号说明:
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server Kafka角色实例所在节点的IP地址:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。
./kafka-topics.sh --list Kafka的Broker实例业务IP:Kafka端口号--command-config 客户端目录/Kafka/kafka/config/client.properties
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为customer_source:
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:
3,zhangsan 5,zhaosi 1,xiaoming 2,liuyang 7,liubei 10,guanyu 20,zhaoyun
输入完成后按回车发送消息,4中的kafka-console-consumer窗口打印结果如下:
3,zhangsan,16,city c 5,zhaosi,32,city e 1,xiaoming,18,city a 2,liuyang,14,city b 7,liubei,41,city a