更新时间:2022-12-07 GMT+08:00
DCS输出流
功能描述
DLI将Flink作业的输出数据输出到分布式缓存服务(DCS)的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。
分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。
前提条件
- 请务必确保您的账户下已在分布式缓存服务(DCS)里创建了Redis类型的缓存实例。
- 该场景作业需要运行在DLI的独享队列上,因此要与DCS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 用户通过VPC对等访问DCS实例时,除了满足VPC对等网跨VPC访问的约束之外,还存在如下约束:
- 当创建DCS实例时使用了172.16.0.0/12~24网段时,DLI队列不能在192.168.1.0/24、192.168.2.0/24、192.168.3.0/24网段。
- 当创建DCS实例时使用了192.168.0.0/16~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。
- 当创建DCS实例时使用了10.0.0.0/8~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。
语法格式
1 2 3 4 5 6 7 8 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dcs_redis", region = "", cluster_address = "", password = "", value_type= "",key_value= "" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。 |
region |
是 |
数据所在的DCS所在区域。 |
cluster_address |
是 |
Redis实例连接地址。 |
password |
否 |
Redis实例连接密码,当设置为免密访问时,省略该配置项。 |
value_type |
是 |
该参数可配置为如下选项或选项的组合:
当需要使用多个命令时,用“;”分隔。 |
key_value |
是 |
设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。 |
注意事项
- 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
- 字符":", ",", ";", "$", "{", "}"已被征用为特殊分隔符,暂时没有提供转义功能,禁止在key和value中作为普通字符使用,否则会影响解析,导致程序异常。
示例
将流qualified_cars的数据输出到DCS服务的Redis类型的缓存实例中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) WITH ( type = "dcs_redis", cluster_address = "192.168.0.34:6379", password = "xxxxxxxx", value_type = "string; list; hash; set; zset", key_value = "${car_id}_str: ${car_owner}; name_list: ${car_owner}; ${car_id}_hash: {name:${car_owner}, age: ${car_age}}; name_set: ${car_owner}; math_zset: {${car_owner}:${average_speed}}" ); |
父主题: 创建输出流