文档首页> 分布式消息服务RocketMQ版> 最佳实践> 通过消息幂等实现消息去重
更新时间:2023-10-25 GMT+08:00
分享

通过消息幂等实现消息去重

方案概述

在RocketMQ的业务处理过程中,如果消息重发了多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,多次消费并没有对业务产生负面影响,那么这个消息处理过程是幂等的。消息幂等保证了无论消息被重复投递多少次,最终的处理结果都是一致的,避免了因消息重复而对业务产生影响。

消息重复场景

在实际应用中,导致消息重复的原因有网络闪断、客户端故障等,且可能发生在消息生产阶段,也可能发生在消息消费阶段。因此,可以将消息重复的场景分为以下两类:

  • 生产者发送消息时发生消息重复:

    生产者发送消息时,消息成功发送至服务端。如果此时发生网络闪断,导致生产者未收到服务端的响应,此时生产者会认为消息发送失败,因此尝试重新发送消息至服务端。当消息重新发送后,在服务端中就会存在两条内容相同的消息,最终消费者会消费到两条Message ID不同但内容相同的消息,因此造成了消息重复。

  • 消费者消费消息时发生消息重复:

    消费者消费消息时,服务端将消息投递至消费者并完成业务处理。如果此时发生网络闪断,导致消费者提交offset失败,服务端的offset未及时更新,此时服务端会认为消息投递失败。为了保证消息至少被消费一次,服务端会尝试投递之前已被处理过的消息,最终消费者就会收到与之前处理过的Message ID相同且内容相同的消息,因此造成了消息重复。

例如在支付场景下,客户在商家进行消费收到扣款信息,由于网络不稳定导致客户收到多次扣款请求,但实际上扣款业务只应进行一次,商家也只应产生一条订单流水。

实施步骤

从上面的消费重复场景可以看到,不同Message ID的消息可能有相同的消息内容,因此Message ID无法作为消息的唯一标识符。RocketMQ可以为消息设置Key,把业务的唯一标识作为消息的唯一标识,从而实现消息的幂等。为消息设置Key的示例代码如下:

Message message = new Message();
message.setKey("Order_id");    // 设置消息的Key,可以使用业务的唯一标识作为Key,例如订单号等。
SentResult sendResult = mqProducer.send(message);

生产者发送消息时,消息已经设置了唯一的Key,在消费者消费消息时,可以根据消息的Key进行幂等处理。消费者通过getKeys()能够读取到消息的唯一标识(如订单号等),业务逻辑围绕该唯一标识进行幂等处理即可。

分享:

    相关文档

    相关产品