更新时间:2024-10-29 GMT+08:00

配置RabbitMQ消息预取值

设置预取值可以限制未被确认的消息个数,一旦消费者中未被确认的消息数量达到设置的预取值,服务端将不再向此消费者发送消息,除非至少有一个未被确认的消息被确认。设置预取值本质上是一种对消费者进行流控的方法。

设置预取值时,需要考虑多种因素:

  • 预取值设置太小可能会损害性能,RabbitMQ会一直在等待获得发送消息的权限。
  • 预取值设置太大可能会导致从队列中取出大量消息传递给一个消费者,而使其他消费者处于空闲状态。另外还需要考虑消费者的配置,消费者在处理消息时会将所有消息保存在内存中,太大的预取值会对消费者的性能产生负面影响,甚至可能会导致消费者崩溃。

更多关于预取值的说明,请参考Consumer Prefetch

预取值设置建议

  • 如果您只有一个或很少几个消费者在处理消息,建议一次预取多条消息,尽量让客户端保持忙碌。如果您的处理时间和网络状态稳定,则只需将总往返时间除以每条消息在客户端的处理时间即可获得估计的预取值。
  • 在消费者多且处理时间短的情况下,建议使用较低的预取值。过低的预取值会使消费者闲置,因为消费者在处理完消息后需要等待下一批的消息到达。过高的值可能会使单个消费者忙碌,其他消费者处于空闲状态。
  • 在消费者多且处理时间很长的情况下,建议您将预取值设置为1,以便消息在所有消费者间均匀分布。

如果客户端配置的消息确认机制为自动确认,则设置的预取值无效,已确认的消息会从队列中删除。

设置预取值

以下示例演示在Java客户端为单个消费者设置预取值为10。

ConnectionFactory factory = new ConnectionFactory();

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//设置预取值为10。
channel.basicQos(10, false);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("my_queue", false, consumer);

在Java客户端中,global的默认值为false,因此以上示例可以简单地写为channel.basicQos(10)

global取值的含义如下:

  • false:分别作用于通道上的每个新消费者。
  • true:在通道上的所有消费者之间所共享。