Updated on 2023-07-17 GMT+08:00

RabbitMQ High Performance

This topic introduces methods to achieve high RabbitMQ performance (considering throughput and reliability) by configuring the queue length, cluster load balancing, priority queues, and other parameters.

Using Short Queues

If a queue has a large number of messages, memory is under heavy pressure. To relieve pressure, RabbitMQ pages out messages to the disk. This process usually takes a long time because it involves recreating the index and restarting a cluster that contains a large number of messages. If there are too many messages paged out to the disk, queues will be blocked, which slows down queue processing and adversely affects the performance of RabbitMQ nodes.

To achieve high performance, shorten queues as much as you can. You are advised to keep no messages stacked in a queue.

For applications that frequently encounter message count surges or require high throughput, you are advised to limit the queue length. The queue length can be kept within the limit by discarding messages at the head of a queue.

The limit can be configured in a policy or a queue declaration argument.

  • Configuring a policy

  • Configuring a queue declaration argument
    // Create a queue.
    HashMap<String, Object> map = new HashMap<>();
    // Set the maximum queue length.
    map.put("x-max-length",10 );
    // Set the queue overflow mode, retaining the first 10 messages.
    map.put("x-overflow","reject-publish" );
    channel.queueDeclare(queueName,false,false,false,map);

By default, when the queue length exceeds the limit, messages at the head of the queue (the oldest messages) are discarded or become dead letter messages. You can change this mode by setting overflow to different values. If overflow is set to drop-head, the oldest messages at the head of the queue are discarded or made dead-letter, and the latest n messages are retained. If overflow is set to reject-publish, the latest messages are discarded, and the oldest n messages are retained.

  • If both these methods are used to set the maximum queue length, the smaller limit is used.
  • Messages beyond the maximum queue length will be discarded.

Cluster Load Balancing

Queue performance depends a single CPU core. When the message processing capability of a RabbitMQ node reaches the bottleneck, you can expand the cluster to improve the throughput.

If multiple nodes are used, the cluster automatically distributes queues across the nodes. In addition to using a cluster, you can use the following two plug-ins to optimize load balancing:

Consistent hash exchange

This plug-in uses an exchange to balance messages between queues. Messages sent to the exchange are consistently and evenly distributed across multiple queues based on the messages' routing keys. This plug-in creates a hash for the routing keys and distributes the messages to queues bound with the exchange. When using this plug-in, ensure that consumers consume messages from all queues.

The following is an example:

  • Route messages based on different routing keys.
    public class ConsistentHashExchangeExample1 {
      private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, "e1", "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, "e1", "2");
        }
    
        ch.confirmSelect();
    
        AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
        for (int i = 0; i < 100000; i++) {
          ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • Route messages based on different headers. In this mode, the hash-header parameter must be specified for the exchange, and messages must contain headers. Otherwise, messages will be routed to the same queue.
    public class ConsistentHashExchangeExample2 {
      public static final String EXCHANGE = "e2";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-header", "hash-on");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          Map<String, Object> hdrs = new HashMap<>();
          hdrs.put("hash-on", String.valueOf(i));
          ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • Route messages based on their properties, such as message_id, correlation_id, or timestamp. In this mode, the hash-property parameter is required to declare the exchange, and messages must contain the specified property. Otherwise, messages will be routed to the same queue.
    public class ConsistentHashExchangeExample3 {
      public static final String EXCHANGE = "e3";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }

RabbitMQ sharding

This plug-in automatically partitions queues. Once you define an exchange as sharded, supporting queues are automatically created on each cluster node to share messages. This plug-in provides a centralized location for sending messages and implements load balancing by adding queues to other nodes in the cluster. When using this plug-in, ensure that consumers consume messages from all queues.

Do as follows to configure the RabbitMQ sharding plug-in:

  1. Create an x-modulus-hash exchange.

  2. Add a policy to the exchange.

  3. View the exchange details to check whether the configuration is successful.

Automatically Deleting Unused Queues

The client may fail to be connected, resulting in residual queues that affect instance performance. RabbitMQ provides the following methods to automatically delete a queue:

  • Set a TTL policy for the queue. For example, if TTL is set to 28 days, the queue will be deleted after staying idle for 28 days.
  • Use an auto-delete queue. When the last consumer exits or the channel or connection is closed (or when its TCP connection with the server is lost), the auto-delete queue is deleted.
  • Use an exclusive queue. This queue can be used only in the connection where it is created. When the connection is closed or disappears, the exclusive queue is deleted.

Configuration:

boolean exclusive = true;
boolean autoDelete = true;
channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);

Limiting the Number of Priority Queues

Each priority queue starts an Erlang process. If there are too many priority queues, performance will be affected. In most cases, you are advised to have no more than five priority queues.

Connections and Channels

Each connection uses about 100 KB memory (or more if TLS is used). Thousands of connections cause high RabbitMQ load and even out-of-memory in extreme cases. The AMQP protocol introduces the concept of channels. Each connection can have multiple channels. Connections exist for a long time. The handshake process for an AMQP connection is complex and requires at least seven TCP data packets (or more if TLS is used). By contrast, it is easier to open and close a channel, and it is recommended that channels exist for a long time. For example, the same channel should be reused for a producer thread, and should not be opened for each production. The best practice is to reuse connections and multiplex a connection between threads with channels.

The Spring AMQP thread pool is recommended. ConnectionFactory is defined by Spring AMQP and is responsible for creating connections.

Do Not Share Channels Between Threads

Most clients do not implement thread safety security on channels, so do not share channels between threads.

Do Not Open and Close Connections or Channels Frequently

Frequently opening and closing connections or channels will lead to a large number of TCP packets being sent and received, resulting in higher latency.

Producers and Consumers Use Different Connections

This improves throughput. If a producer sends too many messages to the server for processing, RabbitMQ transfers the pressure to the TCP connection. If messages are consumed on the same TCP connection, the server may not receive acknowledgments from the client, affecting the consumption performance. If consumption is too slow, the server will be overloaded.

RabbitMQ Management Interface Performance Affected by Too Many Connections and Channels

RabbitMQ collects data of each connection and channel for analysis and display. If there are too many connections and channels, the performance of the RabbitMQ management interface will be affected.

Disabling Unused Plug-ins

Plug-ins may consume a large number of CPU or memory resources. You are advised to disable unused plug-ins.