更新时间:2024-10-29 GMT+08:00

配置RabbitMQ消息确认机制

RabbitMQ的消息确认机制分为生产者确认和消费者确认。在使用RabbitMQ时,生产者确认和消费者确认对于确保数据可靠性至关重要。如果连接失败,传输中的消息可能会丢失,需要重新传输。消息确认机制可以让服务端和客户端知道何时重新传输消息。客户端可以在收到消息时确认消息,也可以在客户端完全处理完消息后确认。

生产者确认会影响性能,如果需要很高的吞吐量,应禁用生产者确认。注意,不使用生产者确认会导致可靠性下降。

更多关于消息确认机制的说明,请参考Consumer Acknowledgements and Publisher Confirms

生产者确认

生产者确认,即服务端在收到来自生产者的消息时进行确认。

以下示例演示在Java客户端配置生产者确认:

try {
	channel.confirmSelect() ; //将信道置为publisher confirm模式
	//之后正常发送消息
	channel.basicPublish("exchange", "routingKey" , null , "publisher confirm test" .getBytes());
	if (!channel.waitForConfirms()) {
		System.out.println( "send message failed " ) ;
		// do something else....
	}
} catch (InterruptedException e) {
		e.printStackTrace() ;
}

调用channel .waitForConfirms方法之后,会等待服务端确认,这是一种同步等待的方式,会对性能产生影响。如果生产者要满足at least once,就必须使用同步等待方式。

消费者确认

消费者确认是指服务端通过确认消息是否成功被消费者接收,来判断是否删除队列中的此消息。

消费者确认对数据可靠性十分重要,接收重要消息的消费应用程序在未处理完消息前不应确认消息,以便消费者有足够的时间处理消息,无需担心消息处理过程中由于消费者进程异常(如工作程序崩溃、重启等)导致消息丢失。

消费者确认在客户端上配置,通过配置basicConsume方法启用确认。在channel中启用消费者确认适用于大多数场景。

以下示例演示在Java客户端配置消费者确认(使用Channel#basicAck设置basic.ack为肯定):

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties, byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge a single delivery, the message will
             // be discarded
             channel.basicAck(deliveryTag, false);
         }
     });

未确认的消息缓存在内存中,如果未确认的消息过多,会导致内存使用率过高,此时可以在客户端配置预取值来限制消费者预取的消息数量,具体方法请参见配置RabbitMQ消息预取值