提高RabbitMQ性能
本章节基于吞吐量和可靠性两个指标,指导您通过设置队列长度、集群负载均衡、优先队列数量等参数,实现RabbitMQ的高性能。
使用较小的队列长度
队列中存在大量消息时,会给内存使用带来沉重的负担。为了释放内存,RabbitMQ会将消息刷新到磁盘。刷盘需要重建索引,重启包含大量消息的集群,导致这个过程非常耗时。当刷盘的消息过多时,会阻塞队列处理消息,从而降低队列速度,影响RabbitMQ节点的性能。
要获得最佳性能,应尽可能缩短队列。建议始终保持队列消息堆积的数量在0左右。
对于经常受到消息峰值影响的应用程序,和对吞吐量要求较高的应用程序,建议在队列上设置最大长度。这样可以通过丢弃队列头部的消息来保持队列长度,队列长度永远不会大于最大长度设置。
最大长度可以通过Policy设置,也可以通过在队列声明时使用对应参数设置。
- 在RabbitMQ WebUI的Policy中设置。
- 在队列声明时使用对应参数设置。
//创建队列 HashMap<String, Object> map = new HashMap<>(); //设置队列最大长度 map.put("x-max-length",10 ); //设置队列溢出方式保留前10 map.put("x-overflow","reject-publish" ); channel.queueDeclare(queueName,false,false,false,map);
当队列长度超过设置的最大长度时,RabbitMQ的默认做法是将队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置不同的overflow值来改变这种方式,具体如下:
- 如果overflow值设置为drop-head,表示从队列前面丢弃或dead-letter消息,保存后n条消息。
- 如果overflow值设置为reject-publish,表示最近发布的消息将被丢弃,即保存前n条消息。
- 如果同时使用以上两种方式设置队列的最大长度,两者中较小的值将被使用。
- 超过队列最大长度的消息会被丢弃,请谨慎使用。
使用集群的负载均衡
队列的性能受单个CPU内核控制,当一个RabbitMQ节点处理消息的能力达到瓶颈时,可以通过集群进行扩展,从而达到提升吞吐量的目的。
使用多个节点,集群会自动将队列均衡的创建在各个节点上。除了使用集群模式,您还可以使用Consistent hash exchange插件优化负载均衡。该插件使用交换器来平衡队列之间的消息。根据消息的路由键,发送到交换器的消息一致且均匀地分布在多个队列中。该插件创建路由键的散列,并将消息传播到与该交换器具有绑定关系的队列中。使用此插件时,需要确保消费者从所有队列中消费。使用示例如下:
- 使用不同的路由键来路由消息。
public class ConsistentHashExchangeExample1 { private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf = new ConnectionFactory(); Connection conn = cf.newConnection(); Channel ch = conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, "e1", "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, "e1", "2"); } ch.confirmSelect(); AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder(); for (int i = 0; i < 100000; i++) { ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are up-to-date in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } }
- 通过不同的header来路由消息。
该方式需要为交换器提供“hash-header”参数设置,且消息必须带有header,否则会被路由到相同的队列。
public class ConsistentHashExchangeExample2 { public static final String EXCHANGE = "e2"; private static String EXCHANGE_TYPE = "x-consistent-hash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf = new ConnectionFactory(); Connection conn = cf.newConnection(); Channel ch = conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } Map<String, Object> args = new HashMap<>(); args.put("hash-header", "hash-on"); ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i = 0; i < 100000; i++) { AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder(); Map<String, Object> hdrs = new HashMap<>(); hdrs.put("hash-on", String.valueOf(i)); ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are up-to-date in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } }
- 使用消息属性来路由消息,例如message_id、correlation_id或timestamp属性。
该方式需要使用“hash-property”参数来声明交换器,且消息必须带有所选择的消息属性,否则会被路由到相同的队列。
public class ConsistentHashExchangeExample3 { public static final String EXCHANGE = "e3"; private static String EXCHANGE_TYPE = "x-consistent-hash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf = new ConnectionFactory(); Connection conn = cf.newConnection(); Channel ch = conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } Map<String, Object> args = new HashMap<>(); args.put("hash-property", "message_id"); ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i = 0; i < 100000; i++) { AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder(); ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are up-to-date in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } }
自动删除不再使用的队列
客户端可能连接失败导致队列被残留,大量的残留队列会影响实例的性能。RabbitMQ提供三种自动删除队列的方法:
- 在队列中设置TTL策略:例如TTL策略设置为28天,当持续28天队列未被使用时,此队列将被删除。
- 使用auto-delete队列:当最后一个消费者退出或通道/连接关闭(或与服务器的TCP连接丢失)时,auto-delete队列会被删除。
- 使用exclusive queue:只能在创建exclusive queue的连接中使用,当此连接关闭或消失时,exclusive queue会被删除。
设置auto-delete队列和exclusive queue的方法如下:
boolean exclusive = true; boolean autoDelete = true; channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);
限制使用优先队列的数量
每个优先队列会启动一个Erlang进程,过多的优先队列会影响性能。在大多数情况下,建议使用不超过5个优先队列。
连接和通道
每个连接使用大约100 KB的内存(如果使用TLS会更多),成千上万的连接会导致RabbitMQ负载很高,极端情况下,会导致内存溢出。AMQP协议引入了通道的概念,一个连接中可以有多个通道。连接是长期存在的,AMQP连接的握手过程比较复杂,至少需要7个TCP数据包(如果使用TLS会更多)。相对连接来说,打开和关闭通道会更简单,但是建议通道也设置为长期存在的。例如,应该为每个生产者线程重用相同的通道,不要在每次生产时都打开通道。最佳实践是重用连接并将线程之间的连接与通道多路复用。
推荐使用Spring AMQP线程池:ConnectionFactory是Spring AMQP定义的连接工厂,负责创建连接。
不要在线程之间共享通道
大多数客户端并未实现通道的线程安全,所以不要在线程之间共享通道。
不要频繁打开和关闭连接或通道
频繁打开和关闭连接或通道会发送和接收大量的TCP包,从而导致更高的延迟,确保不要频繁打开和关闭连接或通道。
生产者和消费者使用不同的连接
生产者和消费者使用不同的连接以实现高吞吐量。当生产者发送太多消息给服务端处理时,RabbitMQ会将压力传递到TCP连接上。如果在同一个TCP连接上消费,服务端可能不会收到来自客户端的消息确认,从而影响消费性能。若消费速度过低,服务端将不堪重负。
大量的连接和通道可能会影响RabbitMQ管理接口的性能
RabbitMQ会收集每个连接和通道的数据进行分析和显示,大量连接和通道会影响RabbitMQ管理接口的性能。
禁用未使用的插件
插件可能会消耗大量CPU或占用大量内存,建议禁用未使用的插件。