更新时间:2024-10-29 GMT+08:00

配置RabbitMQ仲裁队列

仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行。

仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入会造成成倍的磁盘占用。

仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位。

更多关于仲裁队列的说明,请参考Quorum Queues

仅RabbitMQ 3.8.35版本支持仲裁队列。

仲裁队列与镜像队列的差异

仲裁队列是RabbitMQ 3.8版本引入的队列类型,它与镜像队列拥有类似的功能,为RabbitMQ提供高可用的队列。镜像队列有一些设计上的缺陷,这也是RabbitMQ提供仲裁队列的原因。

镜像队列主要的缺陷在于消息同步的性能低。

  • 镜像队列包含一个主队列和多个从队列,当生产者向主队列发送一条消息,主队列会将消息同步给从队列,所有的从队列都保存消息后,主队列才会向生产者发送确认。
  • RabbitMQ使用集群部署时,如果其中一个节点故障下线,待它消除故障重新上线后,它保存的所有从队列的数据都会丢失。此时运维人员需要选择是否同步主队列的数据到从队列中,如果不同步数据,会增加消息丢失的风险。如果同步数据,同步时队列是阻塞的,无法对其进行操作。当队列中存在大量堆积消息时,同步会导致队列几分钟、几小时或者更长时间不可用。

仲裁队列解决了镜像队列的性能和同步问题。

  • 仲裁队列的算法是基于Raft共识算法的一个变种,提供更好的消息吞吐量。仲裁队列包含一个主副本和多个从副本,当生产者向主副本发送一条消息,主副本会将消息同步给从副本,超过半数的副本保存消息后,主副本才会向生产者发送确认。这意味着少部分比较慢的从副本不会影响整个队列的性能。同样地,主副本的选举也需要超过半数的副本同意,这会避免出现网络分区时,队列存在2个主副本。由此可见,仲裁队列相对于可用性更看重一致性。
  • RabbitMQ使用集群部署时,如果其中一个节点故障下线,待它消除故障重新上线后,它保存的数据不会丢失,主副本会直接从从副本中断的地方开始复制消息。复制的过程是非阻塞的,整个队列不会因为新的副本加入而受到影响。

仲裁队列相比镜像队列,缺少了一些特性,如表1所示,且消耗更多的内存和磁盘。

表1 仲裁队列与镜像队列特性差异

特性

镜像队列

仲裁队列

非持久化队列

支持

不支持

排他队列

支持

不支持

每条消息的持久化

每条消息

永远

队列重平衡

自动

手动

消息超时时间

支持

不支持

队列超时时间

支持

支持

队列长度限制

支持

支持(除x-overflow: reject-publish-dlx)

惰性队列

支持

限制队列长度后支持

消息优先级

支持

不支持

消费优先级

支持

支持

死信交换器

支持

支持

动态Policy

支持

支持

毒药消息(让消费者无限循环消费)处理

不支持

支持

全局消息预取(Qos)

支持

不支持

配置仲裁队列

在声明队列时,将队列的“x-queue-type”参数设置为“quorum”。此参数只能在声明队列时设置,不能通过Policy设置。

仲裁队列默认的复制因子是5。

  • 以下示例演示在Java客户端设置仲裁队列。
    ConnectionFactory factory = newConnectionFactory();
    factory.setRequestedHeartbeat(30);
    factory.setHost(HOST);
    factory.setPort(PORT);
    factory.setUsername(USERNAME);
    factory.setPassword(PASSWORD);
    
    finalConnection connection = factory.newConnection();
    finalChannel channel = connection.createChannel();
    // 创建队列参数Map
    Map<String, Object> arguments = newHashMap<>();
    arguments.put("x-queue-type", "quorum");
    // 声明仲裁队列
    channel.queueDeclare("test-quorum-queue", true, false, false, arguments);
  • 以下示例演示在RabbitMQ WebUI页面设置仲裁队列。
    图1 设置仲裁队列

    设置完成后,在“Queues”页面查看队列类型是否为“quorum”,如图2所示。“Node”中的“+2”表示该队列有2个副本,蓝色表示这两个副本消息同步已经完成,如果为红色则表示部分消息还未同步。

    图2 查看队列类型

    在“Queues”页面,单击队列名称,进入队列详情页。查看当前仲裁队列主副本所在节点和在线副本所在节点。

    图3 队列详情页

配置仲裁队列的长度

通过配置Policy或者队列属性的方式可以限制仲裁队列的长度和在内存中保存的长度。

  • x-max-length:仲裁队列最大消息数。如果超过则丢弃消息,或者发送到死信交换器。
  • x-max-length-bytes:仲裁队列最大总消息大小(字节数)。如果超过则丢弃消息,或者发送到死信交换器。
  • x-max-in-memory-length:限制仲裁队列的内存中最大消息数量。
  • x-max-in-memory-bytes:限制仲裁队列的内存中的最大总消息大小(字节数)。

以下举例说明通过配置Policy或者队列属性的方式限制内存中保存的仲裁队列长度。

  • 通过Policy方式配置,推荐使用此方式。
    在Policy中通过设置x-max-in-memory-bytes参数,限制仲裁队列的长度。
    图4 使用Policy设置x-max-in-memory-bytes
  • 通过队列属性方式配置。
    新增队列时,在队列属性设置x-max-in-memory-length参数,限制仲裁队列的长度。
    图5 使用队列属性设置x-max-in-memory-length