更新时间:2022-08-12 GMT+08:00

DCS输出流

功能描述

DLI将Flink作业的输出数据输出到分布式缓存服务(DCS)的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/

分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。

DCS的更多信息,请参见《分布式缓存服务用户指南》。

前提条件

  • 请务必确保您的账户下已在分布式缓存服务(DCS)里创建了Redis类型的缓存实例。

    如何创建Redis类型的缓存实例,请参考《分布式缓存服务用户指南》中“申请Redis缓存实例”章节。

  • 该场景作业需要运行在DLI的独享队列上,因此要与DCS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立增强型跨源连接,请参考《数据湖探索用户指南》“增强型跨源连接”章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

  • 用户通过VPC对等访问DCS实例时,除了满足VPC对等网跨VPC访问的约束之外,还存在如下约束:
    • 当创建DCS实例时使用了172.16.0.0/12~24网段时,DLI队列不能在192.168.1.0/24、192.168.2.0/24、192.168.3.0/24网段。
    • 当创建DCS实例时使用了192.168.0.0/16~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。
    • 当创建DCS实例时使用了10.0.0.0/8~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。

语法格式

1
2
3
4
5
6
7
8
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "dcs_redis",
    region = "",
    cluster_address = "",
    password = "",
    value_type= "",key_value= ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。

region

数据所在的DCS所在区域。

cluster_address

Redis实例连接地址。

password

Redis实例连接密码,当设置为免密访问时,省略该配置项。

value_type

该参数可配置为如下选项或选项的组合:

  • 支持指定插入数据类型,包括:string, list, hash, set, zset;
  • 支持设置key的过期时间,包括expire, pexpire, expireAt, pexpireAt;
  • 支持删除key命令,包括del, hdel;

当需要使用多个命令时,用“;”分隔。

key_value

设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。

注意事项

  • 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
  • 字符":", ",", ";", "$", "{", "}"已被征用为特殊分隔符,暂时没有提供转义功能,禁止在key和value中作为普通字符使用,否则会影响解析,导致程序异常。

示例

将流qualified_cars的数据输出到DCS服务的Redis类型的缓存实例中。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE SINK STREAM qualified_cars (
  car_id STRING, 
  car_owner STRING, 
  car_age INT, 
  average_speed DOUBLE, 
  total_miles DOUBLE
)
  WITH (
    type = "dcs_redis",
    cluster_address = "192.168.0.34:6379",
    password = "xxxxxxxx",
    value_type = "string; list; hash; set; zset",
    key_value = "${car_id}_str: ${car_owner}; name_list: ${car_owner}; ${car_id}_hash: {name:${car_owner}, age: ${car_age}}; name_set:   ${car_owner}; math_zset: {${car_owner}:${average_speed}}"
  );