更新时间:2024-03-12 GMT+08:00

Redis结果表

功能描述

DLI将Flink作业的输出数据输出到Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/

前提条件

要建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table dwsSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
'connector.type' = 'redis',
  'connector.host' = '',
  'connector.port' = '',
  'connector.password' = '',
  'connector.table-name' = '',
  'connector.key-column' = ''
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector类型,对于redis,需配置为'redis'。

connector.host

redis连接地址。

connector.port

redis连接端口。

connector.password

redis认证密码。

connector.deploy-mode

redis部署模式,支持standalone/cluster,默认standalone

connector.table-name

table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。

说明:

table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。

connector.key-column

table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid

connector.write-schema

table存储模式下可配置,是否将当前schema写入到redis,默认为false

connector.data-type

数据存储类型,用户自定义存储模式必配。支持:string, list, hash, set类型。其中string/list以及sets中schema字段数必须为2,hash字段数必须为3

connector.ignore-retraction

是否忽略retraction消息,默认为false

注意事项

参数“connector.table-name”与“connector.data-type”必须配置其中一个。

示例

  • 配置“connector.table-name”参数时的table存储模式示例。
    table模式采用hash类型存储数据,与基本hash类型将表的三个字段分别作为key、hash_key、hash_value不同,table模式下的key值可以通过“connector.table-name”和“connector.key-column”两个参数设置,将表中的所有字段名作为hash_key,字段值作为hash_value写入到hash中。
    create table redisSink(
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_speed INT
    ) with (
      'connector.type' = 'redis',
      'connector.host' = 'xx.xx.xx.xx',
      'connector.port' = '6379',
      'connector.password' = 'xx',
      'connector.table-name'='car_info',
      'connector.key-column'='car_id'
    );
    
    insert into redisSink
      (car_id,car_owner,car_brand,car_speed)
      VALUES
      ("A1234","OwnA","A1234",30);
  • 以下示例演示“connector.data-type”为string, list, hash, set类型时的建表语句。
    • “connector.data-type”为string类型
      表为2列:第一列为key,第二列为value。
      create table redisSink(
        attr1 STRING,
        attr2 STRING
      ) with (
        'connector.type' = 'redis',
        'connector.host' = 'xx.xx.xx.xx',
        'connector.port' = '6379',
        'connector.password' = 'xx',
        'connector.data-type' = 'string'
      );
      
      insert into redisSink
        (attr1,attr2)
        VALUES
        ("car_id","A1234");
    • “connector.data-type”为list类型
      表为2列:第一列为key,第二列为value。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      create table redisSink(
        attr1 STRING,
        attr2 STRING
      ) with (
        'connector.type' = 'redis',
        'connector.host' = 'xx.xx.xx.xx',
        'connector.port' = '6379',
        'connector.password' = 'xx',
        'connector.data-type' = 'list'
      );
      
      insert into redisSink
        (attr1,attr2)
        VALUES
        ("car_id","A1234");
      
    • “connector.data-type”为set类型
      表为2列:第一列为key,第二列为value。
      create table redisSink(
        attr1 STRING,
        attr2 STRING
      ) with (
        'connector.type' = 'redis',
        'connector.host' = 'xx.xx.xx.xx',
        'connector.port' = '6379',
        'connector.password' = 'xx',
        'connector.data-type' = 'set'
      );
      
      insert into redisSink
        (attr1,attr2)
        VALUES
        ("car_id","A1234");
    • “connector.data-type”为hash类型
      表为3列:第一列为key,第二列为hash_key,第三列为hash_value。
      create table redisSink(
        attr1 STRING,
        attr2 STRING,
        attr3 STRING
      ) with (
        'connector.type' = 'redis',
        'connector.host' = 'xx.xx.xx.xx',
        'connector.port' = '6379',
        'connector.password' = 'xx',
        'connector.data-type' = 'hash'
      );
      
      insert into redisSink
        (attr1,attr2,attr3)
        VALUES
        ("car_info","car_id","A1234");