配置RabbitMQ延时消息
延时消息(Delayed Message)指的是发送后不会立即被消费者接收,而是延迟指定时间后才进入队列供消费者消费的消息。
约束与限制
RabbitMQ 3.x.x版本不支持延时交换机功能,仅AMQP-0-9-1版本支持。
配置延时消息
您可以通过以下任意一种方式实现延时消息。
- 设置延时交换机:发送至延时交换机的消息默认携带相同的延时时间。
- 设置消息延时时间:为每条消息设置不同大小的延时时间,可实现不同消息的差异化延时。
- 设置消息过期时间和死信队列:消息过期后会自动路由至死信队列。
发送至延时交换机的消息默认携带相同的延时时间。
以下示例演示在Java客户端配置延时交换机:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("exchange", "x-delayed-message", true, false, args);
为每条消息设置不同大小的延时时间,可实现不同消息的差异化延时。
以下示例演示在Java客户端设置消息延时时间:
final HashMap<String, Object> headers = new HashMap<>();
headers.put("x-delay", 10000);
final AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(headers);
// publish a batch of messages to the main exchange
for (int i = 0; i < messageCount; i++) {
channel.basicPublish(exchangeName, routingKey, properties.build(),
("x-delayed-message-" + i).getBytes());
}
消息过期后会自动路由至死信队列。
以下示例演示在Java客户端设置消息过期时间和死信队列:
channel.queueDeclare("dlq", false, false, false, null);
channel.exchangeDeclare("dlx", BuiltinExchangeType.DIRECT);
channel.queueBind("dlq", "dlx", "rt");
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx");
arguments.put("x-dead-letter-routing-key", "rt");
arguments.put("x-message-ttl", 5000);
channel.exchangeDeclare("normal", BuiltinExchangeType.DIRECT);
channel.queueDeclare("normal", true, false, false, arguments);
channel.queueBind("normal", "normal", "key");
channel.basicPublish("normal", "key", null, "ttl".getBytes());