配置RabbitMQ延时消息
延时消息(Delayed Message)指的是发送后不会立即被消费者接收,而是延迟指定时间后才进入队列供消费者消费的消息。
约束与限制
RabbitMQ 3.x.x版本不支持延时交换机功能,仅AMQP-0-9-1版本支持。
配置延时消息
您可以通过以下任意一种方式实现延时消息。
- 设置延时交换机:发送至延时交换机的消息默认携带相同的延时时间。
- 设置消息延时时间:为每条消息设置不同大小的延时时间,可实现不同消息的差异化延时。
- 设置消息过期时间和死信队列:消息过期后会自动路由至死信队列。
发送至延时交换机的消息默认携带相同的延时时间。
以下示例演示在Java客户端配置延时交换机:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("exchange", "x-delayed-message", true, false, args);
为每条消息设置不同大小的延时时间,可实现不同消息的差异化延时。
以下示例演示在Java客户端设置消息延时时间:
final HashMap<String, Object> headers = new HashMap<>(); headers.put("x-delay", 10000); final AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(headers); // publish a batch of messages to the main exchange for (int i = 0; i < messageCount; i++) { channel.basicPublish(exchangeName, routingKey, properties.build(), ("x-delayed-message-" + i).getBytes()); }
消息过期后会自动路由至死信队列。
以下示例演示在Java客户端设置消息过期时间和死信队列:
channel.queueDeclare("dlq", false, false, false, null); channel.exchangeDeclare("dlx", BuiltinExchangeType.DIRECT); channel.queueBind("dlq", "dlx", "rt"); HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "dlx"); arguments.put("x-dead-letter-routing-key", "rt"); arguments.put("x-message-ttl", 5000); channel.exchangeDeclare("normal", BuiltinExchangeType.DIRECT); channel.queueDeclare("normal", true, false, false, arguments); channel.queueBind("normal", "normal", "key"); channel.basicPublish("normal", "key", null, "ttl".getBytes());