文档首页/ 分布式消息服务RabbitMQ版/ 最佳实践/ 通过消息幂等实现消息去重
更新时间:2024-10-16 GMT+08:00

通过消息幂等实现消息去重

方案概述

在RabbitMQ的业务处理过程中,如果消息重发了多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,多次消费并没有对业务产生负面影响,那么这个消息处理过程是幂等的。消息幂等保证了无论消息被重复投递多少次,最终的处理结果都是一致的,避免了因消息重复而对业务产生影响。

例如在支付场景下,用户购买商品后进行支付,由于网络不稳定导致用户收到多次扣款请求,导致重复扣款。但实际上扣款业务只应进行一次,商家也只应产生一条订单流水。这时候使用消息幂等就可以避免这个问题。

在实际应用中,导致消息重复的原因有网络闪断、客户端故障等,且可能发生在消息生产阶段,也可能发生在消息消费阶段。因此,可以将消息重复的场景分为以下两类:

  • 生产者发送消息时发生消息重复:

    生产者发送消息时,消息成功发送至服务端。如果此时发生网络闪断,导致生产者未收到服务端的响应,此时生产者会认为消息发送失败,因此尝试重新发送消息至服务端。当消息重新发送成功后,在服务端中就会存在两条内容相同的消息,最终消费者会消费到两条内容一样的重复消息。

  • 消费者消费消息时发生消息重复:

    消费者消费消息时,服务端将消息投递至消费者并完成业务处理。如果此时发生网络闪断,导致服务端未收到消费者的响应,此时服务端会认为消息投递失败。为了保证消息至少被消费一次,服务端会尝试投递之前已被处理过的消息,最终消费者会消费到两条内容一样的重复消息。

实施方法

对于消息重复的场景,一般可以使用全局唯一ID来判断该消息是否已消费过。如果已经消费过,则直接返回处理结果,否则进行消息处理,并将全局ID记录下来。

  • 生产者为每一条消息设置唯一的messageID,示例代码如下:
    //持久化消息,并且生成随机的全局唯一messageID
    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
    builder.deliveryMode(2);
    builder.messageId(UUID.randomUUID().toString());
    
    //自定义发送的消息
    String message = "message content";
    
    //生产消息,exchangeNameroutingKey根据实际填写Queue所属的Exchange名称和Routing Key
    channel.basicPublish("exchangeName", "routingKey", false, builder.build(), message.getBytes(StandardCharsets.UTF_8));
    String messageId = builder.build().getMessageId();
    System.out.println("messageID: " + messageId);
    System.out.println("Send message success!");
    //关闭信道
    channel.close();
    //关闭连接
    connection.close();
  • 消费者根据messageID对消息进行幂等处理,示例代码如下:
    //创建一个以messageID为主键的数据库表,利用数据库主键去重的方式来处理RabbitMQ幂等。
    //在消费者消费前先去数据库查询这条消息是否存在,如果存在表示消息已被消费,无需处理;如果不存在表示消息未被消费,执行消费操作
    //queueName根据实际填写要消费的Queue名称
    channel.basicConsume("queueName", false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取messageID,并判断是否为空
                String messageId = properties.getMessageId();
                if (StringUtils.isBlank(messageId)){
                    logger.info("messageId is null");
                    return;
                }
                //查询数据库中是否存在主键为messageID的记录,如果存在,说明这条消息已经被消费,无需处理,否则消费消息,并且在消费完成后将消息记录入库
                //数据库查询逻辑省略
                //todo
    
                //如果数据库中没有messageID的记录,则执行消费,否则提示消息已消费
                if (null == "{数据库查出来的结果记录}"){
                    //获取消息
                    String message = new String(body,StandardCharsets.UTF_8);
                    //手动响应
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    logger.info("[x] received message: "  + message + "," + "messageId:" + messageId);
    
                    //存入数据库表中,标识该消息已消费
                    //数据库插入操作省略
                    //todo
                } else {
                    //如果根据messageID查询到消息已消费,则不进行消费
                    logger.error("该消息已消费,无需重复消费");
                }
        }
    });