Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Improving RabbitMQ Performance

Updated on 2024-10-16 GMT+08:00

This topic introduces methods to achieve high RabbitMQ performance (considering throughput and reliability) by configuring the queue length, cluster load balancing, priority queues, and other parameters.

Using Short Queues

If a queue has a large number of messages, memory is under heavy pressure. To relieve pressure, RabbitMQ pages out messages to the disk. This process is time-consuming because it involves recreating the index on the disk and restarting a cluster that contains a large number of messages. If there are too many messages paged out to the disk, queues will be blocked, which slows down queue processing, and affects the performance of RabbitMQ nodes.

To achieve high performance, shorten queues as much as you can. You are advised to keep no messages stacked in a queue.

For applications that frequently encounter message count surges or require high throughput, you are advised to limit the queue length. The queue length can be kept within the limit by discarding messages at the head of a queue.

The limit can be configured in a policy or a queue declaration argument.

  • Configuring a policy on the RabbitMQ management UI

  • Configuring a queue declaration argument
    // Create a queue.
    HashMap<String, Object> map = new HashMap<>();
    // Set the maximum queue length.
    map.put("x-max-length",10 );
    // Set the queue overflow mode, retaining the first 10 messages.
    map.put("x-overflow","reject-publish" );
    channel.queueDeclare(queueName,false,false,false,map);

By default, when the queue length exceeds the limit, messages at the head of the queue (the oldest messages) are discarded or become dead letter messages. The queue can also be processed in other ways by specifying the overflow parameter:

  • If overflow is set to drop-head, the earliest messages at the head of the queue are discarded or made dead-letter, and the latest n messages are retained.
  • If overflow is set to reject-publish, the latest messages are discarded, and the earliest n messages are retained.
NOTE:
  • If both these methods are used to set the maximum queue length, the smaller limit is used.
  • Messages beyond the maximum queue length will be discarded.

Cluster Load Balancing

Queue performance depends a single CPU core. When the message processing capability of a RabbitMQ node reaches the bottleneck, you can expand the cluster to improve the throughput.

If multiple nodes are used, the cluster automatically distributes queues across the nodes. In addition to using a cluster, you can use the Consistent hash exchange plug-in to optimize load balancing: This plug-in uses an exchange to balance messages between queues. Messages sent to the exchange are consistently and evenly distributed across multiple queues based on the messages' routing keys. This plug-in creates a hash for the routing keys and distributes the messages to queues bound with the exchange. When using this plug-in, ensure that consumers consume messages from all queues. The following is an example:

  • Route messages based on different routing keys.
    public class ConsistentHashExchangeExample1 {
      private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, "e1", "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, "e1", "2");
        }
    
        ch.confirmSelect();
    
        AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
        for (int i = 0; i < 100000; i++) {
          ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • Route messages based on headers.
    In this mode, the hash-header parameter must be specified for the exchange, and messages must contain headers. Otherwise, messages will be routed to the same queue.
    public class ConsistentHashExchangeExample2 {
      public static final String EXCHANGE = "e2";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-header", "hash-on");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          Map<String, Object> hdrs = new HashMap<>();
          hdrs.put("hash-on", String.valueOf(i));
          ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }
  • Route messages based on their properties, such as message_id, correlation_id, or timestamp.
    In this mode, the hash-property parameter is required to declare the exchange, and messages must contain the specified property. Otherwise, messages will be routed to the same queue.
    public class ConsistentHashExchangeExample3 {
      public static final String EXCHANGE = "e3";
      private static String EXCHANGE_TYPE = "x-consistent-hash";
    
      public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory cf = new ConnectionFactory();
        Connection conn = cf.newConnection();
        Channel ch = conn.createChannel();
    
        for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
          ch.queueDeclare(q, true, false, false, null);
          ch.queuePurge(q);
        }
    
        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");
        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
    
        for (String q : Arrays.asList("q1", "q2")) {
          ch.queueBind(q, EXCHANGE, "1");
        }
    
        for (String q : Arrays.asList("q3", "q4")) {
          ch.queueBind(q, EXCHANGE, "2");
        }
    
        ch.confirmSelect();
    
    
        for (int i = 0; i < 100000; i++) {
          AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
          ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8"));
        }
    
        ch.waitForConfirmsOrDie(10000);
    
        System.out.println("Done publishing!");
        System.out.println("Evaluating results...");
        // wait for one stats emission interval so that queue counters
        // are up-to-date in the management UI
        Thread.sleep(5);
    
        System.out.println("Done.");
        conn.close();
      }
    }

Automatically Deleting Unused Queues

The client may fail to be connected, resulting in residual queues that affect instance performance. RabbitMQ provides the following methods to automatically delete a queue:

  • Set a TTL policy for the queue. For example, if TTL is set to 28 days, the queue will be deleted after staying idle for 28 days.
  • Use an auto-delete queue. When the last consumer exits or the channel or connection is closed (or when its TCP connection with the server is lost), the auto-delete queue is deleted.
  • Use an exclusive queue. This queue can be used only in the connection where it is created. When the connection is closed or disappears, the exclusive queue is deleted.

To enable the auto-delete and exclusive queues:

boolean exclusive = true;
boolean autoDelete = true;
channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);

Limiting the Number of Priority Queues

Each priority queue starts an Erlang process. If there are too many priority queues, performance will be affected. In most cases, you are advised to have no more than five priority queues.

Connections and Channels

Each connection uses about 100 KB memory (or more if TLS is used). Thousands of connections cause high RabbitMQ load and even out-of-memory in extreme cases. The AMQP protocol introduces the concept of channels. Each connection can have multiple channels. Connections exist for a long time. The handshake process for an AMQP connection is complex and requires at least seven TCP data packets (or more if TLS is used). By contrast, it is easier to open and close a channel, and it is recommended that channels exist for a long time. For example, the same channel should be reused for a producer thread, and should not be opened for each production. The best practice is to reuse connections and multiplex a connection between threads with channels.

The Spring AMQP thread pool is recommended. ConnectionFactory is defined by Spring AMQP and is responsible for creating connections.

Do Not Share Channels Between Threads

Most clients do not implement thread safety security on channels, so do not share channels between threads.

Do Not Open and Close Connections or Channels Frequently

Frequently opening and closing connections or channels will lead to a large number of TCP packets being sent and received, resulting in higher latency.

Producers and Consumers Use Different Connections

This improves throughput. If a producer sends too many messages to the server for processing, RabbitMQ transfers the pressure to the TCP connection. If messages are consumed on the same TCP connection, the server may not receive acknowledgments from the client, affecting the consumption performance. If consumption is too slow, the server will be overloaded.

RabbitMQ Management Interface Performance Affected by Too Many Connections and Channels

RabbitMQ collects data of each connection and channel for analysis and display. If there are too many connections and channels, the performance of the RabbitMQ management interface will be affected.

Disabling Unused Plug-ins

Plug-ins may consume a large number of CPU or memory resources. You are advised to disable unused plug-ins.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback