FlinkServer对接Redis
操作场景
本章节介绍Redis作为sink表或者维表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。
本示例以安全模式Kafka为例。
前提条件
- 集群中已安装HDFS、Yarn、Redis和Flink服务。
- 包含Redis服务的客户端已安装,例如安装路径为:/opt/client
- 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
操作步骤
场景一:Redis作为sink表。
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE kafka_source ( account varchar(10), costs int, ts AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'user_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE table redis_sink( account varchar, costs int, PRIMARY KEY(account) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'deploy-mode'='cluster', 'need-kerberos-auth' = 'true', 'service-kerberos-name' = 'redis/hadoop.系统域名', 'login-context-name' = 'Client', 'host' = '10.10.10.169', 'port' = '22400', 'isSSLMode' = 'true', 'data-type' = 'string', 'namespace' = 'redis_table_2', 'sink.batch.max-size' = '-1',--是否开启批写Redis并设置批写数量,'-1'表示不开启,若开启则需同步开启CheckPoint 'sink.flush-buffer.timeout' = '1000'--开启批写Redis后可按照指定时间将队列里面的数据刷新到Redis,单位:ms ); INSERT INTO redis_sink SELECT account, SUM(costs) FROM kafka_source GROUP BY TUMBLE(ts, INTERVAL '90' SECOND), --为了快速看到计算结果 account;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- host、port:分别为Redis集群的其中一个实例IP(业务平面)和端口号。
Redis实例的端口计算方式为:22400+该实例的ID-1。
实例ID可以通过在FusionInsight Manager中选择“集群 > 待操作集群的名称 > 服务 > Redis > Redis管理”,单击Redis集群名称查看。
例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400。
- namespace:用于拼接Redis数据库的键,格式为“namespace的值:account的值”。如发送数据的account的值为“A1”,namespace的值为“redis_table_2”,那么此数据在Redis数据库中的键为redis_table_2:A1。
- sink.batch.max-size:
- sink.flush-buffer.timeout:开启批写Redis后,可按照指定时间将队列里面的数据刷新到Redis。单位:ms。
- Kafka Broker实例IP地址及端口号说明:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
- 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。
./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为user_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic user_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:{"account": "A1","costs":"11"} {"account": "A1","costs":"22"} {"account": "A2","costs":"33"} {"account": "A3","costs":"44"}
输入完成后按回车发送消息。
- 执行以下命令登录Redis客户端查询结果,以查询“redis_table_2:A1”为例。
redis-cli -c -h Redis集群中其中一个实例业务IP -p Redis端口号
get redis_table_2:A1
场景二:Redis作为维表。
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE KafkaSource ( -- Kafka作为source表 `user_id` VARCHAR, `user_name` VARCHAR, `age` double, proctime as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'user_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE KafkaSink ( -- Kafka作为sink表 `user_id` VARCHAR, `user_name` VARCHAR, `age` double, `phone_number` VARCHAR, `address` VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'user_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE RedisTable ( -- Redis作为维表 user_name VARCHAR, score double, phone_number VARCHAR, address VARCHAR ) WITH ( 'connector' = 'redis', 'deploy-mode'='cluster', 'need-kerberos-auth' = 'true', 'service-kerberos-name' = 'redis/hadoop.系统域名', 'login-context-name' = 'Client', 'zset-score-column' = 'score', 'host' = '10.10.10.169', 'port' = '22400', 'isSSLMode' = 'true', 'key-ttl-mode' = 'no-ttl', 'data-type' = 'sorted-set', 'namespace' = 'redis_zset', 'zset-delimiter' = ',', 'key-column' = 'user_name', 'schema-syntax' = 'concatenate-fields' ); INSERT INTO KafkaSink SELECT t.user_id, t.user_name, t.age, d.phone_number, d.address FROM KafkaSource as t JOIN RedisTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_name = d.user_name; -- 必须加上FOR SYSTEM_TIME AS OF t.proctime,表示JOIN维表当前时刻所看到的每条数据
- 执行以下命令向Redis维表中写入测试数据。
cd /opt/client/Redis/bin
./redis-cli -h 10.10.10.11 -p 22400 -c
输入消息内容:
ZADD redis_zset:zhangsan 80 153xxxx1111,city1 ZADD redis_zset:lisi 70 153xxxx2222,city2 ZADD redis_zset:wangwu 90 153xxxx3333,city3
若Redis启用通道加密,使用命令:./redis-cli -h 10.10.10.11 -p 22400 --tls -c
登录Manager,选择“集群 > 服务 > Redis > 配置 > 全部配置”,搜索“REDIS_SSL_ON”,将参数“值”设置为“true”,Redis启用SSL通道加密。通道加密在数据传输中对数据进行加密保护,会损耗性能,Redis中无重要或敏感信息不建议开启。
- 生产数据,写入Kafka Source表中。
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
输入消息内容:1,zhangsan,20 2,lisi,25 3,wangwu,28
- 执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config 客户端目录/Kafka/kafka/config/consumer.properties
结果如下:
1,zhangsan,20,153xxxx1111,city1 2,lisi,25,153xxxx2222,city2 3,wangwu,28,153xxxx3333,city3
WITH主要参数说明
配置项 |
是否必选 |
类型 |
描述 |
---|---|---|---|
zSetScoreColumn |
可选 |
String |
Redis作为维表时,ZSet格式score字段对应的列名 |
hashKeyColumn |
可选 |
String |
Hash格式,Hash字段对应的列名 |
host |
必选 |
String |
Redis集群连接IP,为Redis集群的实例IP(业务平面) |
port |
必选 |
String |
端口为对应的Redis实例的端口 Redis实例的端口计算方式为:22400+该实例的ID-1 实例ID可以通过在FusionInsight Manager中选择“集群 > 待操作集群的名称 > 服务 > Redis > Redis管理”,单击Redis集群名称查看 例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400 |
separator |
可选 |
String |
Redis作为维表时,value中的字段分割符,示例:“(,)”、“(\u200b)” |
key-ttl-mode |
可选 |
String |
Redis数据过期策略:
|
key-ttl |
可选 |
String |
配置“key-ttl-mode”参数为非“no-ttl”时需设置该值,该值不需要带单位。 |
isSSLMode |
可选 |
String |
是否开启SSL模式
|
keyPrefix |
可选 |
String |
Redis key的前缀 |