更新时间:2024-11-11 GMT+08:00
实现网络异常时RabbitMQ客户端自动恢复
方案概述
由于服务端重启、网络抖动等原因造成客户端网络连接断开时,将导致客户端无法正常生产和消费消息。
通过在客户端侧设置重连机制,使客户端在网络连接断开时自动恢复连接,降低网络故障对业务的影响。以下场景会触发网络自动恢复:
- 在连接的I/O循环中抛出未处理的异常
- 检测到Socket读取超时
- 检测到服务端心跳丢失
- 4.0.0及以上版本的Java客户端默认支持网络自动恢复,无需设置。
- 如果应用程序使用Connection.Close方法关闭连接,则不会启用或触发网络自动恢复。
网络异常时RabbitMQ客户端重试连接示例代码
客户端和服务端的初始连接失败,不会触发自动恢复,可在客户端编写对应的应用程序代码,通过重试连接来解决初始连接失败的问题。
以下示例演示了使用Java客户端通过重试连接解决初始连接失败的问题。
ConnectionFactory factory = new ConnectionFactory(); // 对于4.0.0版本之前的RabbitMQ Java客户端,开启自动恢复功能 factory.setAutomaticRecoveryEnabled(true); // 配置连接设置 try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
客户端使用建议
- 生产者端
网络异常可能导致消息路由失效,消息丢失。建议在生产者客户端中将Mandatory参数设置为“true”,以便在Exchange根据路由规则无法将消息路由到符合条件的Queue时,调用Basic.Return方法将消息返回给生产者。
以下示例演示在Java客户端开启Mandatory:
ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); Connection conn = factory.newConnection(); channel = connection.createChannel(); channel.confirmSelect(); channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> { // 处理返回值 }); // 第三个参数为Mandatory channel.basicPublish("testExchange", "key", true, null, "test".getBytes());
- 消费者端
网络异常可能导致消费者接收到重复的消息,建议在消费者客户端使用幂等。
以下示例演示在Java客户端设置幂等:
Set<Long> messageStore = new HashSet<Long>(); channel.basicConsume("queue", autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); if (messageStore.contains(deliveryTag)) { //幂等处理 channel.basicAck(deliveryTag, true); } else { try { // handle message logic // 处理消息 handleMessage(envelope); //正常情况ack消息 channel.basicAck(deliveryTag, true); messageStore.add(deliveryTag); } catch (Exception e) { channel.basicNack(deliveryTag, true, true); } } } });