Help Center/ Distributed Message Service for RabbitMQ/ Best Practices/ Automatic Recovery of a RabbitMQ Client from Network Exceptions
Updated on 2024-11-11 GMT+08:00

Automatic Recovery of a RabbitMQ Client from Network Exceptions

Overview

Messages cannot be produced or consumed on the client due to server restart or network jitter.

With a connection retry mechanism on the client, the network connection on the client can be automatically restored. Automatic network recovery is triggered in the following scenarios:

  • An exception is thrown in a connection's I/O loop.
  • Socket read times out.
  • Server heartbeat is lost.
  • Java clients of version 4.0.0 or later support automatic network recovery by default.
  • If an application uses the Connection.Close method to close a connection, automatic network recovery will not be enabled or triggered.

Sample Code for Connection Retry on a RabbitMQ Client During Network Exceptions

If the initial connection between the client and server fails, automatic recovery is not triggered. Edit the corresponding application code on the client and retry the connection to solve the problem.

The following example shows how to use a Java client to resolve an initial connection failure by retrying a connection.

ConnectionFactory factory = new ConnectionFactory();
// For RabbitMQ Java clients earlier than 4.0.0, enable the automatic recovery function.
factory.setAutomaticRecoveryEnabled(true);

// Configure connection settings.
try {
  Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
  Thread.sleep(5000);
  // apply retry logic
}

Client Suggestions

  • Producer

    Set the Mandatory parameter to true on the producer client to avoid message losses caused by network exceptions. This setting calls the Basic.Return method to return messages to the producer when they cannot be routed to matching queues.

    The following sample enables Mandatory on a Java client:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setAutomaticRecoveryEnabled(true);
    Connection conn = factory.newConnection();
    channel = connection.createChannel();
    channel.confirmSelect();
    channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
        // Process the return value.
    });
    // The third parameter is Mandatory.
    channel.basicPublish("testExchange", "key", true, null, "test".getBytes());
  • Consumer

    Use idempotent messages on consumer clients to avoid repeated messages due to network exceptions.

    The following sample sets idempotence on a Java client:

    Set<Long> messageStore = new HashSet<Long>();
    channel.basicConsume("queue", autoAck, "a-consumer-tag",
            new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException
                {
                    long deliveryTag = envelope.getDeliveryTag();
                    if (messageStore.contains(deliveryTag)) {
    		// Idempotent processing
                        channel.basicAck(deliveryTag, true);
                    } else {
                        try {
                            // handle message logic
    	      // Process the message.
                            handleMessage(envelope);
    			// Normally acknowledge the message.
                            channel.basicAck(deliveryTag, true);
                            messageStore.add(deliveryTag);
                        } catch (Exception e) {
                            channel.basicNack(deliveryTag, true, true);
                        }
                    }
                }
            });