How Does Message Accumulation Affect Services? What Can I Do?
How Does Message Accumulation Affect Services?
Excessive message accumulation in a queue may cause memory or disk alarms. As a result, all connections will be blocked, other queues cannot be used, and the overall service quality deteriorates.
Causes of Message Accumulation
- Messages are published much faster than they are retrieved. For example, consumers process messages slowly in a certain period. It may take only 3 seconds to send a message, but 1 minute to retrieve the message. If 20 messages are sent per minute, and only one message is processed by consumers, a large number of messages will be stacked in the queue.
- Consumers are abnormal and cannot retrieve messages, while publishers keep sending messages.
- Consumers are normal, but their subscriptions to queues are abnormal.
- Consumers and their subscriptions to queues are normal, but the code logic of consumers is time-consuming, which reduces the consumption capability and results in a situation similar to 1.
Solutions to Message Accumulation
- If messages are published much faster than they are retrieved, solve the problem in the following ways:
- Add consumers to accelerate message retrieval.
- Use publisher confirms and monitor the publishing rate and duration on the publishing end. When the duration increases significantly, apply flow control.
- If consumers are abnormal, check whether the consumer logic is correct and optimize the program.
- Check whether consumers' subscriptions to queues are normal.
- If the code logic of consumers is time-consuming, set expiration time on messages using either of the following methods:
- When creating messages, set the message expiration time by using the expiration parameter.
- Set the value of expiration in properties. The unit is ms.
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); String message = "hello rabbitmq"; channel.basicPublish(exchange, routingKey, properties, message.getBytes(StandardCharsets.UTF_8));
- Set the value of expiration on the management UI. The unit is ms.
Log in to the management UI. On the Exchanges tab page, click an exchange name to view its details. In the Publish message area, set expiration, as shown in the following figure.
- Set the value of expiration in properties. The unit is ms.
- Set the queue expiration time by using the x-message-ttl parameter. The expiration time starts when messages enter the queue. When the expiration time elapses, the messages are automatically deleted.
- Set the value of x-message-ttl in the client code. The unit is ms.
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 10000); channel.queueDeclare(queueName, true, false, false, arguments);
- Set the value of x-message-ttl when creating a queue on the management UI. The unit is ms.
Log in to the management UI. On the Exchanges tab page, create a queue and set the value of x-message-ttl, as shown in the following figure.
- Set the value of x-message-ttl in the client code. The unit is ms.
- When creating messages, set the message expiration time by using the expiration parameter.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.