更新时间:2024-10-16 GMT+08:00
通过消息幂等实现消息去重
方案概述
在RabbitMQ的业务处理过程中,如果消息重发了多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,多次消费并没有对业务产生负面影响,那么这个消息处理过程是幂等的。消息幂等保证了无论消息被重复投递多少次,最终的处理结果都是一致的,避免了因消息重复而对业务产生影响。
例如在支付场景下,用户购买商品后进行支付,由于网络不稳定导致用户收到多次扣款请求,导致重复扣款。但实际上扣款业务只应进行一次,商家也只应产生一条订单流水。这时候使用消息幂等就可以避免这个问题。
在实际应用中,导致消息重复的原因有网络闪断、客户端故障等,且可能发生在消息生产阶段,也可能发生在消息消费阶段。因此,可以将消息重复的场景分为以下两类:
实施方法
对于消息重复的场景,一般可以使用全局唯一ID来判断该消息是否已消费过。如果已经消费过,则直接返回处理结果,否则进行消息处理,并将全局ID记录下来。
- 生产者为每一条消息设置唯一的messageID,示例代码如下:
//持久化消息,并且生成随机的全局唯一messageID AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder(); builder.deliveryMode(2); builder.messageId(UUID.randomUUID().toString()); //自定义发送的消息 String message = "message content"; //生产消息,exchangeName和routingKey根据实际填写Queue所属的Exchange名称和Routing Key channel.basicPublish("exchangeName", "routingKey", false, builder.build(), message.getBytes(StandardCharsets.UTF_8)); String messageId = builder.build().getMessageId(); System.out.println("messageID: " + messageId); System.out.println("Send message success!"); //关闭信道 channel.close(); //关闭连接 connection.close();
- 消费者根据messageID对消息进行幂等处理,示例代码如下:
//创建一个以messageID为主键的数据库表,利用数据库主键去重的方式来处理RabbitMQ幂等。 //在消费者消费前先去数据库查询这条消息是否存在,如果存在表示消息已被消费,无需处理;如果不存在表示消息未被消费,执行消费操作 //queueName根据实际填写要消费的Queue名称 channel.basicConsume("queueName", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取messageID,并判断是否为空 String messageId = properties.getMessageId(); if (StringUtils.isBlank(messageId)){ logger.info("messageId is null"); return; } //查询数据库中是否存在主键为messageID的记录,如果存在,说明这条消息已经被消费,无需处理,否则消费消息,并且在消费完成后将消息记录入库 //数据库查询逻辑省略 //todo //如果数据库中没有messageID的记录,则执行消费,否则提示消息已消费 if (null == "{数据库查出来的结果记录}"){ //获取消息 String message = new String(body,StandardCharsets.UTF_8); //手动响应 channel.basicAck(envelope.getDeliveryTag(),false); logger.info("[x] received message: " + message + "," + "messageId:" + messageId); //存入数据库表中,标识该消息已消费 //数据库插入操作省略 //todo } else { //如果根据messageID查询到消息已消费,则不进行消费 logger.error("该消息已消费,无需重复消费"); } } });