更新时间:2024-09-30 GMT+08:00

配置表

Flink开源特性增强:配置表

在某些场景下,用户存在固定的配置表,存储了基础信息;当平台接收流数据并处理时,需要与配置表进行匹配操作。由于配置表可能较大,考虑使用Redis存储,Redis是一个高性能的key-value数据库,流数据查询时延较低。

具体流程如下:

图1 流程图

Redis存储数据

Redis并不是简单的key-value存储,实际上它是一个数据结构服务器,支持不同类型的值。支持数据类型存储如下:

  • 二进制安全的字符串。
  • Lists: 按插入顺序排序的字符串元素的集合。基本上就是链表(linked lists)
  • Sets: 不重复且无序的字符串元素的集合。
  • Sorted sets:每个字符串元素都关联到一个叫score浮动数值(floating number value)。里面的元素是通过score进行排序,它是可以检索的一系列元素。
  • Hashes:由field和关联的value组成的map,field和value都是字符串。
  • Bit arrays: 通过特殊的命令,用户可以将String值当作一系列bits处理。例如用户可以设置和清除单独的bits,统计出所有设为1的bits的数量,或找到第一个被设为1或0的bit等等。
  • HyperLogLogs: 这是被用于估计一个set中元素数量的概率性的数据结构。

为满足最大5亿条数据配置表的存储并及时响应查询,使用Redis集群存储配置表,并使用流的异步IO作消息查询,提高数据处理的吞吐量。

  • Redis集群:在集群环境上的各个节点上部署Redis,并将数据分散存储在各个节点上,提升了存储容量,目前MRS中已有Redis组件。
  • 异步IO:处理流数据,最大化数据处理的吞吐量,提高处理效率。

涉及Redis主要有两部分,Redis安装部署以及配置表数据导入:

  1. Redis安装。

    MRS已经有Redis组件,在集群安装时可以勾选安装。

  2. 配置表导入Redis。

    用户可以按照配置表的特征选取主键或者关键某几列作为key值,当需要存储的配置表的属性较多时,建议以Hashes的数据形式存储。

    MRS的Redis组件提供了Jedis客户端对数据进行插入查询,可以参考Redis组件样例代码。

Redis数据类型详细信息请参见官网:https://redis.io/topics/data-types-intro

Flink异步IO

当与外部系统进行交互时,如外部的数据库,访问等待时间过长导致数据处理效率低。异步IO实现了不需要等待请求返回就可以同时发送其他请求,以此提高数据吞吐量。

异步IO的API实现需要注意三点:

  • AsyncFunction函数实现了数据处理的异步处理,需要重写asyncInvoke方法。
  • 回调函数获取算子的结果,并且通过AsyncCollector收集起来。
    图2 Async.I/O的比较
  • 超时时间和最大容量设置。

    超时时间定义了一个异步请求失败的最大时间。最大容量设置是指同时可以存在多少个异步请求,过多导致资源消耗加大;过小导致并行数小,吞吐量不能提高;建议针对数据源特点进行合理适配。