分布式消息服务RabbitMQ版
分布式消息服务RabbitMQ版
- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- RabbitMQ使用的版本是多少?
- RabbitMQ实例SSL连接的协议版本号是多少?
- 创建实例时为什么无法查看子网和安全组等信息?
- RabbitMQ集群实例如何均衡分发请求到每个虚拟机?
- RabbitMQ实例集群内部的队列是否有冗余备份?
- RabbitMQ实例是否支持持久化,如何定时备份数据?
- RabbitMQ实例开启SSL后,证书怎么获取?
- RabbitMQ实例的SSL开关是否支持修改?
- RabbitMQ实例是否支持扩容?
- RabbitMQ支持双向认证吗?
- RabbitMQ支持升级CPU和内存吗?
- 如何关闭RabbitMQ的WebUI?
- 实例是否支持修改可用区?
- 如何获取region id?
- 为什么不能选择2个可用区?
- 单机RabbitMQ实例如何变更为集群实例?
- RabbitMQ实例创建后,能修改VPC和子网吗?
- 连接问题
- 消息问题
- 监控告警问题
-
实例问题
- 视频帮助
- 文档下载
- 通用参考
本文导读
链接复制成功!
实现网络异常时RabbitMQ客户端自动恢复
方案概述
由于服务端重启、网络抖动等原因造成客户端网络连接断开时,将导致客户端无法正常生产和消费消息。
通过在客户端侧设置重连机制,使客户端在网络连接断开时自动恢复连接,降低网络故障对业务的影响。以下场景会触发网络自动恢复:
- 在连接的I/O循环中抛出未处理的异常
- 检测到Socket读取超时
- 检测到服务端心跳丢失
- 4.0.0及以上版本的Java客户端默认支持网络自动恢复,无需设置。
- 如果应用程序使用Connection.Close方法关闭连接,则不会启用或触发网络自动恢复。
网络异常时RabbitMQ客户端重试连接示例代码
客户端和服务端的初始连接失败,不会触发自动恢复,可在客户端编写对应的应用程序代码,通过重试连接来解决初始连接失败的问题。
以下示例演示了使用Java客户端通过重试连接解决初始连接失败的问题。
ConnectionFactory factory = new ConnectionFactory(); // 对于4.0.0版本之前的RabbitMQ Java客户端,开启自动恢复功能 factory.setAutomaticRecoveryEnabled(true); // 配置连接设置 try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
客户端使用建议
- 生产者端
网络异常可能导致消息路由失效,消息丢失。建议在生产者客户端中将Mandatory参数设置为“true”,以便在Exchange根据路由规则无法将消息路由到符合条件的Queue时,调用Basic.Return方法将消息返回给生产者。
以下示例演示在Java客户端开启Mandatory:
ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); Connection conn = factory.newConnection(); channel = connection.createChannel(); channel.confirmSelect(); channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> { // 处理返回值 }); // 第三个参数为Mandatory channel.basicPublish("testExchange", "key", true, null, "test".getBytes());
- 消费者端
网络异常可能导致消费者接收到重复的消息,建议在消费者客户端使用幂等。
以下示例演示在Java客户端设置幂等:
Set<Long> messageStore = new HashSet<Long>(); channel.basicConsume("queue", autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); if (messageStore.contains(deliveryTag)) { //幂等处理 channel.basicAck(deliveryTag, true); } else { try { // handle message logic // 处理消息 handleMessage(envelope); //正常情况ack消息 channel.basicAck(deliveryTag, true); messageStore.add(deliveryTag); } catch (Exception e) { channel.basicNack(deliveryTag, true, true); } } } });