配置RabbitMQ消息确认机制
RabbitMQ的消息确认机制分为生产者确认和消费者确认。在使用RabbitMQ时,生产者确认和消费者确认对于确保数据可靠性至关重要。如果连接失败,传输中的消息可能会丢失,需要重新传输。消息确认机制可以让服务端和客户端知道何时重新传输消息。客户端可以在收到消息时确认消息,也可以在客户端完全处理完消息后确认。
生产者确认会影响性能,如果需要很高的吞吐量,应禁用生产者确认。注意,不使用生产者确认会导致可靠性下降。
更多关于消息确认机制的说明,请参考Consumer Acknowledgements and Publisher Confirms。
生产者确认
生产者确认,即服务端在收到来自生产者的消息时进行确认。
以下示例演示在Java客户端配置生产者确认:
try { channel.confirmSelect() ; //将信道置为publisher confirm模式 //之后正常发送消息 channel.basicPublish("exchange", "routingKey" , null , "publisher confirm test" .getBytes()); if (!channel.waitForConfirms()) { System.out.println( "send message failed " ) ; // do something else.... } } catch (InterruptedException e) { e.printStackTrace() ; }
调用channel .waitForConfirms方法之后,会等待服务端确认,这是一种同步等待的方式,会对性能产生影响。如果生产者要满足at least once,就必须使用同步等待方式。
消费者确认
消费者确认是指服务端通过确认消息是否成功被消费者接收,来判断是否删除队列中的此消息。
消费者确认对数据可靠性十分重要,接收重要消息的消费应用程序在未处理完消息前不应确认消息,以便消费者有足够的时间处理消息,无需担心消息处理过程中由于消费者进程异常(如工作程序崩溃、重启等)导致消息丢失。
消费者确认在客户端上配置,通过配置basicConsume方法启用确认。在channel中启用消费者确认适用于大多数场景。
以下示例演示在Java客户端配置消费者确认(使用Channel#basicAck设置basic.ack为肯定):
// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
未确认的消息缓存在内存中,如果未确认的消息过多,会导致内存使用率过高,此时可以在客户端配置预取值来限制消费者预取的消息数量,具体方法请参见配置RabbitMQ消息预取值。