更新时间:2024-10-16 GMT+08:00
实现RabbitMQ节点重启后消费者自动重连
方案概述
RabbitMQ的amqp-client虽然自带重连机制,但是自带的重连机制只会重试一次,重连失败后就不再执行。这时如果消费者没有做额外的重试机制,那么这个消费者就彻底断开与服务端的连接,无法消费消息。
amqp-client在节点断连后,根据与通道建立的节点不同,产生不同的错误。
- 如果通道连接的是队列所在的节点,消费者就会收到一个shutdown信号。这时amqp-client的重连机制就会生效,尝试重新连接服务端。如果连接成功,这个通道就会继续连接消费。如果连接失败,就会执行channel.close方法,关闭这个通道。
- 如果通道连接的不是队列所在的节点,消费者不会触发关闭动作,而是由服务端发送的一个取消动作。这个动作对amqp-client来说并不是异常行为,所以日志上不会有明显的报错,但是连接最终还是会关闭。
amqp-client出现上面两种错误时,会分别回调handleShutdownSignal以及handleCancel方法。您可以通过重写这两种方法,在回调时执行重写的重连逻辑,就能在通道关闭后重新为消费者创建新的通道继续消费。
RabbitMQ节点重启后消费者自动重连示例代码
以下提供一个简单的Java代码示例,此示例能够解决上面的两种错误,实现消费者的持续消费。
package rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitConsumer { public static void main(String... args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 配置实例的连接地址和端口 factory.setHost("192.168.0.2"); factory.setPort(5672); // 配置实例连接的用户名和密码 factory.setUsername("name"); factory.setPassword("password"); Connection connection = factory.newConnection(); createNewConnection(connection); } // 重连处理 public static void createNewConnection(Connection connection) { try { Thread.sleep(1000); Channel channel = connection.createChannel(); channel.basicQos(64); channel.basicConsume("queue-01", false, new CustomConsumer(channel, connection)); } catch (Exception e) { // e.printStackTrace(); createNewConnection(connection); } } static class CustomConsumer implements Consumer { private final Channel _channel; private final Connection _connection; public CustomConsumer(Channel channel, Connection connection) { _channel = channel; _connection = connection; } @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { System.out.println("handleCancel"); System.out.println(consumerTag); createNewConnection(_connection); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { System.out.println("handleShutdownSignal"); System.out.println(consumerTag); System.out.println(sig.getReason()); createNewConnection(_connection); } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); _channel.basicAck(envelope.getDeliveryTag(), false); } } }