Redis结果表
功能描述
DLI将Flink作业的输出数据输出到Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。
前提条件
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 如果未在创建Redis结果表的语句中定义Redis key的字段,则会使用生成的uuid作为key。
- 如果需要指定Redis中的key,则需要在flink的Redis结果表中定义主键,该主键的值即为key。
- Redis结果表如果定义主键,则不能够定义复合主键,即主键只能是一个字段,不能是多个字段。
- schema-syntax取值约束:
- 当schema-syntax为map或array时,非主键字段最多只能只有一个,且需要为相应的map或array类型。
- 当schema-syntax为fields-scores时,非主键字段个数需要为偶数,且除主键字段外,每两个字段的第二个字段的类型需要为double,会将该字段的值视为前一个字段的score。其示例如下:
CREATE TABLE redisSink ( order_id string, order_channel string, order_time double, pay_amount STRING, real_pay double, pay_time string, user_id double, user_name string, area_id double, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
- data-type取值约束:
- 当data-type为string时,只能有一个非主键字段。
- 当data-type为sorted-set,且schema-syntax为fields和array时,会使用default-score作为score。
- 当data-type为sorted-set,且schema-syntax为map时,除主键字段外,只能有一个非主键字段,且需要为map类型,同时该字段的map的value需要为double类型,表示score,该字段的map的key表示redis的set中的值。
- 当data-type为sorted-set,且schema-syntax为array-scores时,除主键字段外,只能有两个非主键字段,且这两个字段的类型需要为array.
两个字段其中第一个字段类型是array表示Redis的set中的值,第二个字段类型为array<double>,表示相应索引的score。其示例如下:
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
语法格式
1 2 3 4 5 6 7 8 9 |
create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'redis', 'host' = '' ); |
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'redis'。 |
host |
是 |
无 |
String |
redis连接地址。 |
port |
否 |
6379 |
Integer |
redis连接端口。 |
password |
否 |
无 |
String |
redis认证密码。 |
namespace |
否 |
无 |
String |
redis key的namespace。 例如设置该值为"person",假设key为"jack"则redis中会是"person:jack"。 |
delimiter |
否 |
: |
String |
redis的key和namespace之间的分隔符。 |
data-type |
否 |
hash |
String |
redis的数据类型,有下列选项,与redis的数据类型相对应:
data-type取值约束详见data-type取值约束说明。 |
schema-syntax |
否 |
fields |
String |
redis的schema语义,包含以下值:
schema-syntax取值约束详见schema-syntax取值约束说明。 |
deploy-mode |
否 |
standalone |
String |
redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 该值可参考redis集群的实例类型介绍。 |
retry-count |
否 |
5 |
Integer |
连接redis集群的尝试次数。 |
connection-timeout-millis |
否 |
10000 |
Integer |
尝试连接redis集群时的最大超时时间。 |
commands-timeout-millis |
否 |
2000 |
Integer |
等待操作完成响应的最大时间。 |
rebalancing-timeout-millis |
否 |
15000 |
Integer |
redis集群失败时的休眠时间。 |
default-score |
否 |
0 |
Double |
当data-type设置为“sorted-set”数据类型的默认score。 |
ignore-retraction |
否 |
false |
Boolean |
是否忽略retract消息。 |
skip-null-values |
否 |
true |
Boolean |
是否跳过null。如果为false,则设置为字符串"null"。 |
ignore-retractions |
否 |
false |
Boolean |
连接器应忽略更新插入/撤回流模式下的收回消息。 |
key-column |
否 |
无 |
String |
Redis 表schema的key |
sink.delivery-guarantee |
否 |
at-least-once |
String |
|
sink.parallelism |
否 |
无 |
int |
定义接收器的自定义并行度。默认情况下,如果未定义此选项,则规划器将通过考虑全局配置来单独派生每个语句的并行度。 |
key-ttl-mode |
否 |
no-ttl |
String |
key-ttl-mode是开启Redis sink TTL的功能参数,key-ttl-mode的限制为:no-ttl、expire-msec、expire-at-date、expire-at-timestamp。
|
key-ttl |
否 |
无 |
String |
key-ttl是key-ttl-mode的补充参数,有以下几种参数值:
|
示例
该示例是从Kafka数据源中读取数据,并写入Redis到结果表中,其具体步骤如下:
- 参考增强型跨源连接,根据Redis所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Redis的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据redis的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --如下redisSink表data-type为默认值hash,schema-syntax定义为fields,将order_id定义为主键,即将该字段的值作为redis的key CREATE TABLE redisSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = '<yourRedis>', 'password' = '<yourPassword>', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields' ); insert into redisSink select * from orders;
- 连接Kafka集群,向Kafka中插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 在Redis中分别执行以下命令,查看运行结果:
- 获取key为"202103241606060001"的结果。
HGETALL 202103241606060001
运行结果:1) "user_id" 2) "0001" 3) "user_name" 4) "Alice" 5) "pay_amount" 6) "200.0" 7) "real_pay" 8) "180.0" 9) "order_time" 10) "2021-03-24 16:06:06" 11) "area_id" 12) "330106" 13) "order_channel" 14) "appShop" 15) "pay_time" 16) "2021-03-24 16:10:06"
- 获取key为"202103241000000001"的结果。
HGETALL 202103241000000001
运行结果:1) "user_id" 2) "0001" 3) "user_name" 4) "Alice" 5) "pay_amount" 6) "100.0" 7) "real_pay" 8) "100.0" 9) "order_time" 10) "2021-03-24 10:00:00" 11) "area_id" 12) "330106" 13) "order_channel" 14) "webShop" 15) "pay_time" 16) "2021-03-24 10:02:03"
- 获取key为"202103241606060001"的结果。
常见问题
- Q:当data-type为set时,最终结果数据相比输入数据个数少了是什么原因?
- Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 40 to line 1, column 105: Parameters must be of the same type
A:则考虑使用了array类型,但是array中各个字段的类型不统一,需要保持Redis中array中各个字段的类型统一。
- Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
org.apache.flink.addons.redis.core.exception.RedisConnectorException: Wrong Redis schema for 'map' syntax: There should be a key (possibly) and 1 MAP non-key column.
A:schema-syantax为map时,在flink中的建表语句只能有一个非主键的列,且该列类型需要为map。
- Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
org.apache.flink.addons.redis.core.exception.RedisConnectorException: Wrong Redis schema for 'array' syntax: There should be a key (possibly) and 1 ARRAY non-key column.
A:schema-syantax为array时,在flink中的建表语句只能有一个非主键的列,且该列类型需要为array。
- Q:data-type已经设置了类型,那么schema-syntax的作用是什么?
A:schema-syntax实际是对特殊类型的处理,如对map和array类型的处理。
- 对于fields,会对每个字段的值进行处理;对于array和map则会将该字段中的每个元素进行处理。当是fields时,会将该map或array类型的字段值直接作为一个redis中的一个value。
- 而当是array或者map时,会将array中的每个值作为redis中的一个value,会将map中该字段的value作为redis中的value。array-scores用于sorted-set的data-type,表示使用两个array字段,第一个字段为set中的值,第二个字段表示相应值所对应的score。fields-scores用于sorted-set的data-type,表示从定义的字段中获取score,该类型表示除主键外的奇数字段表示set中的值,该字段的下一个字段表示该字段的score,因此该字段的下一个字段需要为double类型。
- Q:当data-type为hash时,schema-syntax为fields和map的区别是什么?
A:当使用fields时,会将flink中的字段名作为redis的hash数据类型的field,该字段对应的值作为redis的hash数据类型的value。而当使用map时,会将flink中该字段值的key作为redis的hash数据类型的field,该字段值的value作为redis hash数据类型的value。其具体示例如下:
- 对于fields:
- 创建的Flink作业运行脚本如下:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE redisSink ( order_id string, maptest Map<string, String>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields' ); insert into redisSink select order_id, Map[user_id, area_id] from orders;
- 连接Kafka集群,向Kafka的topic插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 在Redis中,查看其结果如下:
1) "maptest" 2) "{0001=330106}"
- 创建的Flink作业运行脚本如下:
- 对于map:
- 对于map而言,创建的Flink作业运行脚本如下:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE redisSink ( order_id string, maptest Map<string, String>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'map' ); insert into redisSink select order_id, Map[user_id, area_id] from orders;
- 连接Kafka集群,向Kafka的topic插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 在Redis中,查看其结果如下:
1) "0001" 2) "330106"
- 对于map而言,创建的Flink作业运行脚本如下:
- 对于fields:
- Q:当data-type为list时,schema-syntax为fields和array的区别是什么?
A:fields和array的不同不会导致结果不同。只是在flink建表语句中不同,fields可以是多个字段,而array需要该字段为array类型,且array中的数据类型必须相同,因此fields会更加灵活。
- 对于fields:
- 对于fields而言,创建的Flink作业运行脚本如下:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE redisSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'list', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields' ); insert into redisSink select * from orders;
- 连接Kafka集群,向Kafka的topic插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 使用以下命令查看其结果如下:
LRANGE 202103241000000001 0 8
查询命令执行结果:1) "webShop" 2) "2021-03-24 10:00:00" 3) "100.0" 4) "100.0" 5) "2021-03-24 10:02:03" 6) "0001" 7) "Alice" 8) "330106"
- 对于fields而言,创建的Flink作业运行脚本如下:
- 对于array:
- 对于array而言,创建的Flink作业运行脚本如下:
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE redisSink ( order_id string, arraytest Array<String>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'list', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array' ); insert into redisSink select order_id, array[order_channel,order_time,pay_time,user_id,user_name,area_id] from orders;
- 连接Kafka集群,向Kafka的topic插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 在Redis中,查看其结果如下(与fields结果不同是因为这里array类型,在flink中的sink建表语句中没有加入double类型的数据,因此少了两个值,并不是由于fields与array不同导致):
1) "webShop" 2) "2021-03-24 10:00:00" 3) "2021-03-24 10:02:03" 4) "0001" 5) "Alice" 6) "330106"
- 对于array而言,创建的Flink作业运行脚本如下:
- 对于fields: