Automatic Consumer Reconnection After a RabbitMQ Node Restart
Overview
amqp-client of RabbitMQ has a built-in reconnection mechanism with only one retry. If the reconnection fails, there will be no further retries which means that the connection is lost, and the consumer will no longer be able to consume messages, unless the consumer has an additional retry mechanism.
After amqp-client is disconnected from a node, different errors are generated depending on the node that the channel is connected to.
- If the channel is connected to the node where the queue is located, the consumer receives a shutdown signal. Then, the amqp-client reconnection mechanism takes effect and the consumer attempts to reconnect to the server. If the connection is successful, the channel continues to be connected for consumption. If the connection fails, the channel.close method is used to close the channel.
- If the channel is not connected to the node where the queue is located, consumer closure is not triggered. Instead, the server sends a cancel notification. This is not an exception for amqp-client, so no obvious error is reported in the log. However, the connection will be closed eventually.
When these two errors occur, amqp-client calls back the handleShutdownSignal and handleCancel methods. You can rewrite these methods to execute the rewritten reconnection logic during the callback. In this way, a new channel can be created for the consumer to continue consumption after a previous channel is closed.
Sample Code of Automatic Consumer Reconnection After a RabbitMQ Node Restart
The following is a Java code example which can solve the preceding two errors for continuous consumption.
package rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitConsumer { public static void main(String... args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // Configure the connection address and port of the instance. factory.setHost("192.168.0.2"); factory.setPort(5672); // Configure the username and password for instance connection. factory.setUsername("name"); factory.setPassword("password"); Connection connection = factory.newConnection(); createNewConnection(connection); } // Reconnection public static void createNewConnection(Connection connection) { try { Thread.sleep(1000); Channel channel = connection.createChannel(); channel.basicQos(64); channel.basicConsume("queue-01", false, new CustomConsumer(channel, connection)); } catch (Exception e) { // e.printStackTrace(); createNewConnection(connection); } } static class CustomConsumer implements Consumer { private final Channel _channel; private final Connection _connection; public CustomConsumer(Channel channel, Connection connection) { _channel = channel; _connection = connection; } @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { System.out.println("handleCancel"); System.out.println(consumerTag); createNewConnection(_connection); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { System.out.println("handleShutdownSignal"); System.out.println(consumerTag); System.out.println(sig.getReason()); createNewConnection(_connection); } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); _channel.basicAck(envelope.getDeliveryTag(), false); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.