更新时间:2024-10-16 GMT+08:00

提高RabbitMQ性能

本章节基于吞吐量和可靠性两个指标,指导您通过设置队列长度、集群负载均衡、优先队列数量等参数,实现RabbitMQ的高性能。

使用较小的队列长度

队列中存在大量消息时,会给内存使用带来沉重的负担。为了释放内存,RabbitMQ会将消息刷新到磁盘。刷盘需要重建索引,重启包含大量消息的集群,导致这个过程非常耗时。当刷盘的消息过多时,会阻塞队列处理消息,从而降低队列速度,影响RabbitMQ节点的性能。

要获得最佳性能,应尽可能缩短队列。建议始终保持队列消息堆积的数量在0左右

对于经常受到消息峰值影响的应用程序,和对吞吐量要求较高的应用程序,建议在队列上设置最大长度。这样可以通过丢弃队列头部的消息来保持队列长度,队列长度永远不会大于最大长度设置。

最大长度可以通过Policy设置,也可以通过在队列声明时使用对应参数设置。

  • 在RabbitMQ WebUI的Policy中设置。

  • 在队列声明时使用对应参数设置。
    //创建队列
    HashMap<String, Object> map = new HashMap<>();
    //设置队列最大长度
    map.put("x-max-length",10 );
    //设置队列溢出方式保留前10
    map.put("x-overflow","reject-publish" );
    channel.queueDeclare(queueName,false,false,false,map);

当队列长度超过设置的最大长度时,RabbitMQ的默认做法是将队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置不同的overflow值来改变这种方式,具体如下:

  • 如果overflow值设置为drop-head,表示从队列前面丢弃或dead-letter消息,保存后n条消息。
  • 如果overflow值设置为reject-publish,表示最近发布的消息将被丢弃,即保存前n条消息。
  • 如果同时使用以上两种方式设置队列的最大长度,两者中较小的值将被使用。
  • 超过队列最大长度的消息会被丢弃,请谨慎使用。

使用集群的负载均衡

队列的性能受单个CPU内核控制,当一个RabbitMQ节点处理消息的能力达到瓶颈时,可以通过集群进行扩展,从而达到提升吞吐量的目的。

使用多个节点,集群会自动将队列均衡的创建在各个节点上。除了使用集群模式,您还可以使用Consistent hash exchange插件优化负载均衡。该插件使用交换器来平衡队列之间的消息。根据消息的路由键,发送到交换器的消息一致且均匀地分布在多个队列中。该插件创建路由键的散列,并将消息传播到与该交换器具有绑定关系的队列中。使用此插件时,需要确保消费者从所有队列中消费。使用示例如下:

  • 使用不同的路由键来路由消息。
    public class ConsistentHashExchangeExample1 {
      private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, "e1", "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, "e1", "2");
        }
    
        ch.confirmSelect();
    
        AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
        for (int i = 0; i < 100000; i++) {
          ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • 通过不同的header来路由消息。
    该方式需要为交换器提供“hash-header”参数设置,且消息必须带有header,否则会被路由到相同的队列。
    public class ConsistentHashExchangeExample2 {
      public static final String EXCHANGE = "e2";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-header", "hash-on");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          Map<String, Object> hdrs = new HashMap<>();
          hdrs.put("hash-on", String.valueOf(i));
          ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • 使用消息属性来路由消息,例如message_id、correlation_id或timestamp属性。
    该方式需要使用“hash-property”参数来声明交换器,且消息必须带有所选择的消息属性,否则会被路由到相同的队列。
    public class ConsistentHashExchangeExample3 {
      public static final String EXCHANGE = "e3";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }

自动删除不再使用的队列

客户端可能连接失败导致队列被残留,大量的残留队列会影响实例的性能。RabbitMQ提供三种自动删除队列的方法:

  • 在队列中设置TTL策略:例如TTL策略设置为28天,当持续28天队列未被使用时,此队列将被删除。
  • 使用auto-delete队列:当最后一个消费者退出或通道/连接关闭(或与服务器的TCP连接丢失)时,auto-delete队列会被删除。
  • 使用exclusive queue:只能在创建exclusive queue的连接中使用,当此连接关闭或消失时,exclusive queue会被删除。

设置auto-delete队列和exclusive queue的方法如下:

boolean exclusive = true;
boolean autoDelete = true;
channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);

限制使用优先队列的数量

每个优先队列会启动一个Erlang进程,过多的优先队列会影响性能。在大多数情况下,建议使用不超过5个优先队列。

连接和通道

每个连接使用大约100 KB的内存(如果使用TLS会更多),成千上万的连接会导致RabbitMQ负载很高,极端情况下,会导致内存溢出。AMQP协议引入了通道的概念,一个连接中可以有多个通道。连接是长期存在的,AMQP连接的握手过程比较复杂,至少需要7个TCP数据包(如果使用TLS会更多)。相对连接来说,打开和关闭通道会更简单,但是建议通道也设置为长期存在的。例如,应该为每个生产者线程重用相同的通道,不要在每次生产时都打开通道。最佳实践是重用连接并将线程之间的连接与通道多路复用。

推荐使用Spring AMQP线程池:ConnectionFactory是Spring AMQP定义的连接工厂,负责创建连接。

不要在线程之间共享通道

大多数客户端并未实现通道的线程安全,所以不要在线程之间共享通道。

不要频繁打开和关闭连接或通道

频繁打开和关闭连接或通道会发送和接收大量的TCP包,从而导致更高的延迟,确保不要频繁打开和关闭连接或通道。

生产者和消费者使用不同的连接

生产者和消费者使用不同的连接以实现高吞吐量。当生产者发送太多消息给服务端处理时,RabbitMQ会将压力传递到TCP连接上。如果在同一个TCP连接上消费,服务端可能不会收到来自客户端的消息确认,从而影响消费性能。若消费速度过低,服务端将不堪重负。

大量的连接和通道可能会影响RabbitMQ管理接口的性能

RabbitMQ会收集每个连接和通道的数据进行分析和显示,大量连接和通道会影响RabbitMQ管理接口的性能。

禁用未使用的插件

插件可能会消耗大量CPU或占用大量内存,建议禁用未使用的插件。