更新时间:2024-09-27 GMT+08:00

Redis结果表

功能描述

DLI将Flink作业的输出数据输出到Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/

前提条件

DLI要建立与Redis的增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 如果未在创建Redis结果表的语句中定义Redis key的字段,则会使用生成的uuid作为key。
  • 如果需要指定Redis中的key,则需要在flink的Redis结果表中定义主键,该主键的值即为key。
  • Redis结果表如果定义主键,则不能够定义复合主键,即主键只能是一个字段,不能是多个字段。
  • schema-syntax取值约束:
    • 当schema-syntax为map或array时,非主键字段最多只能只有一个,且需要为相应的map或array类型。
    • 当schema-syntax为fields-scores时,非主键字段个数需要为偶数,且除主键字段外,每两个字段的第二个字段的类型需要为double,会将该字段的值视为前一个字段的score。其示例如下:
      CREATE TABLE redisSink (
        order_id string,
        order_channel string,
        order_time double,
        pay_amount STRING,
        real_pay double,
        pay_time string,
        user_id double,
        user_name string,
        area_id double,
        primary key (order_id) not enforced
      ) WITH (
        'connector' = 'redis',
        'host' = 'RedisIP',
        'password' = 'RedisPassword',
        'data-type' = 'sorted-set',
        'deploy-mode' = 'master-replica',
        'schema-syntax' = 'fields-scores'
      );
  • data-type取值约束:
    • 当data-type为string时,只能有一个非主键字段。
    • 当data-type为sorted-set,且schema-syntax为fields和array时,会使用default-score作为score。
    • 当data-type为sorted-set,且schema-syntax为map时,除主键字段外,只能有一个非主键字段,且需要为map类型,同时该字段的map的value需要为double类型,表示score,该字段的map的key表示redis的set中的值。
    • 当data-type为sorted-set,且schema-syntax为array-scores时,除主键字段外,只能有两个非主键字段,且这两个字段的类型需要为array.
      两个字段其中第一个字段类型是array表示Redis的set中的值,第二个字段类型为array<double>,表示相应索引的score。其示例如下:
      CREATE TABLE redisSink (
        order_id string,
        arrayField Array<String>,
        arrayScore array<double>,
        primary key (order_id) not enforced
      ) WITH (
        'connector' = 'redis',
        'host' = 'RedisIP',
        'password' = 'RedisPassword',
        'data-type' = 'sorted-set',
        "default-score" = '3',
        'deploy-mode' = 'master-replica',
        'schema-syntax' = 'array-scores'
      );

语法格式

1
2
3
4
5
6
7
8
9
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'redis',
  'host' = ''
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'redis'。

host

String

redis连接地址。

port

6379

Integer

redis连接端口。

password

String

redis认证密码。

namespace

String

redis key的namespace。

例如设置该值为"person",假设key为"jack"则redis中会是"person:jack"。

delimiter

:

String

redis的key和namespace之间的分隔符。

data-type

hash

String

redis的数据类型,有下列选项,与redis的数据类型相对应:

  • hash
  • list
  • set
  • sorted-set
  • string

data-type取值约束详见data-type取值约束说明。

schema-syntax

fields

String

redis的schema语义,包含以下值:

  • fields:适用于所有数据类型。fields类型是指可以设置多个字段,写入时会取每个字段的值。
  • fields-scores:适用于sorted set数据类型,表示对每个字段都设置一个字段作为其独立的score。
  • array:适用于list、set、sorted set数据类型
  • array-scores:适用于sorted set数据类型
  • map:适用于hash、sorted set数据类型。

schema-syntax取值约束详见schema-syntax取值约束说明。

deploy-mode

standalone

String

redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。

该值可参考redis集群的实例类型介绍。

retry-count

5

Integer

连接redis集群的尝试次数。

connection-timeout-millis

10000

Integer

尝试连接redis集群时的最大超时时间。

commands-timeout-millis

2000

Integer

等待操作完成响应的最大时间。

rebalancing-timeout-millis

15000

Integer

redis集群失败时的休眠时间。

default-score

0

Double

当data-type设置为“sorted-set”数据类型的默认score。

ignore-retraction

false

Boolean

是否忽略retract消息。

skip-null-values

true

Boolean

是否跳过null。如果为false,则设置为字符串"null"。

ignore-retractions

false

Boolean

连接器应忽略更新插入/撤回流模式下的收回消息。

key-column

String

Redis 表schema的key

sink.delivery-guarantee

at-least-once

String

  • exactly-once:

    记录只传送一次,在故障转移方案下也是如此。如果要生成完整的exactly-once管道,需要源和接收器支持exactly-once,并且已正确配置。

  • at-least-once:

    确保传递记录,但可能会多次传递同一记录。通常,这种比exactly-once模式更快。

  • none:

    记录将尽最大努力交付。这通常是处理记录的最快方法,但可能会发生记录丢失或重复的情况。

sink.parallelism

int

定义接收器的自定义并行度。默认情况下,如果未定义此选项,则规划器将通过考虑全局配置来单独派生每个语句的并行度。

key-ttl-mode

no-ttl

String

key-ttl-mode是开启Redis sink TTL的功能参数,key-ttl-mode的限制为:no-ttl、expire-msec、expire-at-date、expire-at-timestamp。

  • no-ttl:不设置过期时间。
  • expire-msec:设置key多久过期,参数为long类型字符串,单位为毫秒。
  • expire-at-date:设置key到某个时间点过期,参数为UTC时间。
  • expire-at-timestamp:设置key到某个时间点过期,参数为时间戳。

key-ttl

String

key-ttl是key-ttl-mode的补充参数,有以下几种参数值:

  • 当key-ttl-mode取值为no-ttl时,不需要配置此参数。
  • 当key-ttl-mode取值为expire-msec时,需要配置为可以解析成Long型的字符串。例如5000,表示5000ms后key过期。
  • 当key-ttl-mode取值为expire-at-date时,需要配置为Date类型字符串,例如2011-12-03T10:15:30,表示到期时间为北京时间2011-12-03 18:15:30。
  • 当key-ttl-mode取值为expire-at-timestamp时,需要配置为timestamp类型字符串,单位为毫秒。例如1679385600000,表示到期时间为2023-03-21 16:00:00。

示例

该示例是从Kafka数据源中读取数据,并写入Redis到结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,根据Redis所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置Redis的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据redis的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    如下脚本中的加粗参数请根据实际环境修改。
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    --如下redisSink表data-type为默认值hash,schema-syntax定义为fields,将order_id定义为主键,即将该字段的值作为redis的key
    CREATE TABLE redisSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      primary key (order_id) not enforced
    ) WITH (
      'connector' = 'redis',
      'host' = '<yourRedis>',
      'password' = '<yourPassword>',
      'deploy-mode' = 'master-replica',
      'schema-syntax' = 'fields'
    );
    
    insert into redisSink select * from orders;
  4. 连接Kafka集群,向Kafka中插入如下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  5. 在Redis中分别执行以下命令,查看运行结果:
    • 获取key为"202103241606060001"的结果。

      执行命令:

      HGETALL 202103241606060001
      运行结果:
       1) "user_id"
       2) "0001"
       3) "user_name"
       4) "Alice"
       5) "pay_amount"
       6) "200.0"
       7) "real_pay"
       8) "180.0"
       9) "order_time"
      10) "2021-03-24 16:06:06"
      11) "area_id"
      12) "330106"
      13) "order_channel"
      14) "appShop"
      15) "pay_time"
      16) "2021-03-24 16:10:06"
    • 获取key为"202103241000000001"的结果。

      执行命令:

      HGETALL 202103241000000001
      运行结果:
       1) "user_id"
       2) "0001"
       3) "user_name"
       4) "Alice"
       5) "pay_amount"
       6) "100.0"
       7) "real_pay"
       8) "100.0"
       9) "order_time"
      10) "2021-03-24 10:00:00"
      11) "area_id"
      12) "330106"
      13) "order_channel"
      14) "webShop"
      15) "pay_time"
      16) "2021-03-24 10:02:03"

常见问题

  • Q:当data-type为set时,最终结果数据相比输入数据个数少了是什么原因?

    A:这是因为输入数据中有重复数据,导致在Redis的set中会进行排重,因此个数变少了。

  • Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
    org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 40 to line 1, column 105: Parameters must be of the same type

    A:则考虑使用了array类型,但是array中各个字段的类型不统一,需要保持Redis中array中各个字段的类型统一。

  • Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
    org.apache.flink.addons.redis.core.exception.RedisConnectorException: Wrong Redis schema for 'map' syntax: There should be a key (possibly) and 1 MAP non-key column.

    A:schema-syantax为map时,在flink中的建表语句只能有一个非主键的列,且该列类型需要为map。

  • Q:如果Flink作业的日志中有如下报错信息,应该怎么解决?
    org.apache.flink.addons.redis.core.exception.RedisConnectorException: Wrong Redis schema for 'array' syntax: There should be a key (possibly) and 1 ARRAY non-key column.

    A:schema-syantax为array时,在flink中的建表语句只能有一个非主键的列,且该列类型需要为array。

  • Q:data-type已经设置了类型,那么schema-syntax的作用是什么?

    A:schema-syntax实际是对特殊类型的处理,如对map和array类型的处理。

    • 对于fields,会对每个字段的值进行处理;对于array和map则会将该字段中的每个元素进行处理。当是fields时,会将该map或array类型的字段值直接作为一个redis中的一个value。
    • 而当是array或者map时,会将array中的每个值作为redis中的一个value,会将map中该字段的value作为redis中的value。array-scores用于sorted-set的data-type,表示使用两个array字段,第一个字段为set中的值,第二个字段表示相应值所对应的score。fields-scores用于sorted-set的data-type,表示从定义的字段中获取score,该类型表示除主键外的奇数字段表示set中的值,该字段的下一个字段表示该字段的score,因此该字段的下一个字段需要为double类型。
  • Q:当data-type为hash时,schema-syntax为fields和map的区别是什么?

    A:当使用fields时,会将flink中的字段名作为redis的hash数据类型的field,该字段对应的值作为redis的hash数据类型的value。而当使用map时,会将flink中该字段值的key作为redis的hash数据类型的field,该字段值的value作为redis hash数据类型的value。其具体示例如下:

    • 对于fields:
      1. 创建的Flink作业运行脚本如下:
        CREATE TABLE orders (
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'kafkaTopic',
          'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
          'properties.group.id' = 'GroupId',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        );
        
        CREATE TABLE redisSink (
          order_id string,
          maptest Map<string, String>,
          primary key (order_id) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'deploy-mode' = 'master-replica',
          'schema-syntax' = 'fields'
        );
        
        insert into redisSink select order_id, Map[user_id, area_id] from orders;
      2. 连接Kafka集群,向Kafka的topic插入如下测试数据:
        {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      3. 在Redis中,查看其结果如下:
        1) "maptest"
        2) "{0001=330106}"
    • 对于map:
      1. 对于map而言,创建的Flink作业运行脚本如下:
        CREATE TABLE orders (
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'kafkaTopic',
          'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
          'properties.group.id' = 'GroupId',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        );
        
        CREATE TABLE redisSink (
          order_id string,
          maptest Map<string, String>,
          primary key (order_id) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'deploy-mode' = 'master-replica',
          'schema-syntax' = 'map'
        );
        
        insert into redisSink select order_id, Map[user_id, area_id] from orders;
      2. 连接Kafka集群,向Kafka的topic插入如下测试数据:
        {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      3. 在Redis中,查看其结果如下:
        1) "0001"
        2) "330106"
  • Q:当data-type为list时,schema-syntax为fields和array的区别是什么?

    A:fields和array的不同不会导致结果不同。只是在flink建表语句中不同,fields可以是多个字段,而array需要该字段为array类型,且array中的数据类型必须相同,因此fields会更加灵活。

    • 对于fields:
      1. 对于fields而言,创建的Flink作业运行脚本如下:
        CREATE TABLE orders (
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'kafkaTopic',
          'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
          'properties.group.id' = 'GroupId',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        );
        
        CREATE TABLE redisSink (
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string, 
          primary key (order_id) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'data-type' = 'list',
          'deploy-mode' = 'master-replica',
          'schema-syntax' = 'fields'
        );
        
        insert into redisSink select * from orders;
      2. 连接Kafka集群,向Kafka的topic插入如下测试数据:
        {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      3. 使用以下命令查看其结果如下:

        Redis执行以下命令:

        LRANGE 202103241000000001 0 8
        查询命令执行结果:
        1) "webShop"
        2) "2021-03-24 10:00:00"
        3) "100.0"
        4) "100.0"
        5) "2021-03-24 10:02:03"
        6) "0001"
        7) "Alice"
        8) "330106"
    • 对于array:
      1. 对于array而言,创建的Flink作业运行脚本如下:
        CREATE TABLE orders (
          order_id string,
          order_channel string,
          order_time string,
          pay_amount double,
          real_pay double,
          pay_time string,
          user_id string,
          user_name string,
          area_id string
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'kafkaTopic',
          'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
          'properties.group.id' = 'GroupId',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        );
        
        CREATE TABLE redisSink (
          order_id string,
          arraytest Array<String>,
          primary key (order_id) not enforced
        ) WITH (
          'connector' = 'redis',
          'host' = 'RedisIP',
          'password' = 'RedisPassword',
          'data-type' = 'list',
          'deploy-mode' = 'master-replica',
          'schema-syntax' = 'array'
        );
        
        insert into redisSink select order_id, array[order_channel,order_time,pay_time,user_id,user_name,area_id] from orders;
      2. 连接Kafka集群,向Kafka的topic插入如下测试数据:
        {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      3. 在Redis中,查看其结果如下(与fields结果不同是因为这里array类型,在flink中的sink建表语句中没有加入double类型的数据,因此少了两个值,并不是由于fields与array不同导致):
        1) "webShop"
        2) "2021-03-24 10:00:00"
        3) "2021-03-24 10:02:03"
        4) "0001"
        5) "Alice"
        6) "330106"