更新时间:2024-12-31 GMT+08:00

配置Redis Pipeline

方案概述

分布式缓存服务Redis支持原生Redis的Pipeline(管道传输)机制,通过Pipeline机制可以将多个命令同时发给Redis服务端,减少网络延迟,提高性能。

通常在非Pipeline的模式下,Redis客户端(Client)向Redis发送一个命令后,会等待服务端(Server)返回结果,然后再发送下一个命令,以此类推。但在Pipeline模式下,客户端发送一个命令后无需等待服务端返回执行结果,会继续发送其他命令。在全部命令发送完毕后,客户端关闭请求,开始接收响应,把收到的执行结果与之前发送的命令按顺序进行匹配。

图1 非Pipeline模式与Pipeline模式的网络通信示意图

在Pipeline模式的具体实现中,大部分Redis客户端采用批量处理的方式,即一次发送多个命令,在接收完所有命令执行结果后再返回给上层业务。通过Pipeline模式可降低网络往返时延(Round-trip time,简称RTT),减少read()和write()的系统调用和进程切换次数,从而提升程序的执行效率与性能。

因此,在需要执行Redis批量操作,且用户无需立即获得每个操作结果的场景下,可以使用Pipeline作为优化性能的批处理工具。

  • 使用Pipeline时客户端将独占与服务器端的连接,此期间将不能进行其他“非Pipeline”的操作,直至Pipeline被关闭。如果需要同时执行其他操作,可以为Pipeline操作单独建立一个连接,将其与常规非Pipeline操作分开。
  • 关于Pipeline的更多介绍,请参见Redis pipeline

约束与限制

  • Pipeline不能保证原子性。

    Pipeline模式只是将客户端发送命令的方式改为批量发送命令,而服务端在批量处理命令的数据流时,仍然是解析出多个单命令并按顺序执行,各个命令相互独立,即服务端仍有可能在该过程中执行其他客户端的命令。如需保证原子性,请使用事务或Lua脚本。

  • 若Pipeline执行过程中发生错误,不支持回滚。
  • Pipeline没有事务的特性,如待执行的命令前后存在依赖关系,请勿使用Pipeline。

    如果某些客户端(例如redis-py)在实现Pipeline时使用事务命令MULTI、EXEC进行伪装,请您在使用过程中关注Pipeline与事务的区别,否则可能会产生报错,关于事务的限制请参见Redis transactions

  • 由于服务端以及部分客户端存在缓存区限制,建议单次Pipeline不要使用过多的命令。
  • 由于Redis集群架构本身具有一定限制,例如不支持在单个命令中访问跨Slot的Key、当访问到不属于本节点的数据时会产生-MOVED错误等,请在集群架构中使用Pipeline时,确保Pipeline内部的命令符合集群架构的可执行条件,具体限制请参见实例受限使用命令

性能对比

如下代码将演示使用Pipeline与不使用Pipeline的性能对比。

public static void main(String[] args) {
    // 设置Redis实例连接地址和端口
    Jedis jedis = new Jedis("127.0.0.1", 6379);
 
    // 连续执行多次命令操作
    final int COUNT=5000;
    String key = "key";
    
    // 1 ---不使用pipeline操作---
    jedis.del(key);   // 初始化key
    long t1 = System.currentTimeMillis();
    for (int i = 0; i < COUNT; i++) {
        // 发送一个请求,并接收一个响应(Send Request and  Receive Response)
        jedis.incr(key);
    }
    long t2 = System.currentTimeMillis();
    System.out.println("不使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (t2 - t1) + "ms");

    // 2 ----使用pipeline操作---
    jedis.del(key);   // 初始化key
    Pipeline p1 = jedis.pipelined();
    long t3 = System.currentTimeMillis();
    for (int i = 0; i < COUNT; i++) {
        // 发出请求 Send Request
        p1.incr(key);
    }
    // 接收响应 Receive Response
    p1.sync();
    long t4 = System.currentTimeMillis();
 
    System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (t4 - t3)+ "ms");
    jedis.close();}

在输入了正确的DCS Redis实例访问地址和密码之后,运行以上Java程序,输出结果示例如下,结果显示使用pipeline的性能更快。

不使用Pipeline > value为:5000 > 操作用时:1204ms
使用Pipeline > value为:5000 > 操作用时:9ms

响应数据(Response)的处理方式

在Jedis中使用Pipeline时,对于响应数据(Response)的处理有两种方式,详情请参见以下代码示例。

public static void main(String[] args) {
    // 设置Redis实例连接地址和端口
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    String key = "key";
    jedis.del(key);  // 初始化

    // -------- 方法1 --------
    Pipeline p1 = jedis.pipelined();
    System.out.println("-----方法1-----");
    for (int i = 0; i < 5; i++) {
        p1.incr(key);
        System.out.println("Pipeline发送请求");
    }
    // 发送请求完成,开始接收响应
    System.out.println("发送请求完成,开始接收响应");
    List<Object> responses = p1.syncAndReturnAll();
    if (responses == null || responses.isEmpty()) {
        jedis.close();
        throw new RuntimeException("Pipeline error: 没有接收到响应");
    }
    for (Object resp : responses) {
        System.out.println("Pipeline接收响应Response: " + resp.toString());
    }
    System.out.println();

    // -------- 方法2 --------
    System.out.println("-----方法2-----");
    jedis.del(key);   // 初始化
    Pipeline p2 = jedis.pipelined();

    // 需要先声明Response
    Response<Long> r1 = p2.incr(key);
    System.out.println("Pipeline发送请求");
    Response<Long> r2 = p2.incr(key);
    System.out.println("Pipeline发送请求");
    Response<Long> r3 = p2.incr(key);
    System.out.println("Pipeline发送请求");
    Response<Long> r4 = p2.incr(key);
    System.out.println("Pipeline发送请求");
    Response<Long> r5 = p2.incr(key);
    System.out.println("Pipeline发送请求");
    try {
        r1.get();   // 此时还未开始接收响应,所以此操作会出错
    } catch (Exception e) {
        System.out.println(" <<< Pipeline error:还未开始接收响应  >>> ");
    }
    // 发送请求完成,开始接收响应
    System.out.println("发送请求完成,开始接收响应");
    p2.sync();
    System.out.println("Pipeline接收响应Response: " + r1.get());
    System.out.println("Pipeline接收响应Response: " + r2.get());
    System.out.println("Pipeline接收响应Response: " + r3.get());
    System.out.println("Pipeline接收响应Response: " + r4.get());
    System.out.println("Pipeline接收响应Response: " + r5.get());
    jedis.close();}

在输入了正确的Redis实例访问地址和密码之后,运行以上Java程序,输出结果如下:

-----方法1-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5
-----方法2-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
 <<< Pipeline error:还未开始接收响应  >>> 
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5