文档首页/ 分布式消息服务RabbitMQ版/ 最佳实践/ 实现网络异常时RabbitMQ客户端自动恢复
更新时间:2024-11-11 GMT+08:00

实现网络异常时RabbitMQ客户端自动恢复

方案概述

由于服务端重启、网络抖动等原因造成客户端网络连接断开时,将导致客户端无法正常生产和消费消息。

通过在客户端侧设置重连机制,使客户端在网络连接断开时自动恢复连接,降低网络故障对业务的影响。以下场景会触发网络自动恢复:

  • 在连接的I/O循环中抛出未处理的异常
  • 检测到Socket读取超时
  • 检测到服务端心跳丢失
  • 4.0.0及以上版本的Java客户端默认支持网络自动恢复,无需设置。
  • 如果应用程序使用Connection.Close方法关闭连接,则不会启用或触发网络自动恢复。

网络异常时RabbitMQ客户端重试连接示例代码

客户端和服务端的初始连接失败,不会触发自动恢复,可在客户端编写对应的应用程序代码,通过重试连接来解决初始连接失败的问题。

以下示例演示了使用Java客户端通过重试连接解决初始连接失败的问题。

ConnectionFactory factory = new ConnectionFactory();
// 对于4.0.0版本之前的RabbitMQ Java客户端,开启自动恢复功能
factory.setAutomaticRecoveryEnabled(true);

// 配置连接设置
try {
  Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
  Thread.sleep(5000);
  // apply retry logic
}

客户端使用建议

  • 生产者端

    网络异常可能导致消息路由失效,消息丢失。建议在生产者客户端中将Mandatory参数设置为“true”,以便在Exchange根据路由规则无法将消息路由到符合条件的Queue时,调用Basic.Return方法将消息返回给生产者。

    以下示例演示在Java客户端开启Mandatory:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setAutomaticRecoveryEnabled(true);
    Connection conn = factory.newConnection();
    channel = connection.createChannel();
    channel.confirmSelect();
    channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
        // 处理返回值
    });
    // 第三个参数为Mandatory
    channel.basicPublish("testExchange", "key", true, null, "test".getBytes());
  • 消费者端

    网络异常可能导致消费者接收到重复的消息,建议在消费者客户端使用幂等。

    以下示例演示在Java客户端设置幂等:

    Set<Long> messageStore = new HashSet<Long>();
    channel.basicConsume("queue", autoAck, "a-consumer-tag",
            new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException
                {
                    long deliveryTag = envelope.getDeliveryTag();
                    if (messageStore.contains(deliveryTag)) {
    		//幂等处理
                        channel.basicAck(deliveryTag, true);
                    } else {
                        try {
                            // handle message logic
    	      // 处理消息
                            handleMessage(envelope);
    			//正常情况ack消息
                            channel.basicAck(deliveryTag, true);
                            messageStore.add(deliveryTag);
                        } catch (Exception e) {
                            channel.basicNack(deliveryTag, true, true);
                        }
                    }
                }
            });