更新时间:2024-10-15 GMT+08:00

实现订阅关系一致

方案概述

订阅关系一致指的是同一个消费组下所有消费者所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

消费原理

RocketMQ为每个Topic划分了消息队列(Queue),队列数越大消费的并发度越大。一个消费组表示一个消费者群组,在分布式应用场景下,同一个消费组中的多个消费者共同完成Topic所有Queue的消费。Queue的分配以消费组为单位,会均匀分配给消费组下的消费者,而不会在意该消费者是否订阅了当前Topic。一个消费者只会分配到Topic中的某几个Queue,而一个Queue只会分配给一个消费者。

正确的订阅关系

在分布式应用场景下,一个消费组中所有的消费者拥有一个相同的消费组ID,他们需订阅相同的Topic和Tag,保持订阅关系一致,才能保证消息消费逻辑正确,消息不丢失。

  • 同一个消费组的消费者必须订阅同一个Topic。例如,消费组Group1中有消费者A和消费者B,消费者A订阅了Topic A和Topic B,则消费者B也必须订阅Topic A和Topic B,不能只订阅Topic A或只订阅Topic B,或者额外订阅Topic C。
  • 同一个消费组的消费者订阅同一个Topic下的Tag必须一致,包括Tag的数量和顺序。例如,消费组Group2中有消费者A和消费者B,消费者A订阅Topic A且Tag为Tag A||Tag B,则消费者B订阅Topic A时Tag也必须为Tag A||Tag B,不能只订阅Tag A或只订阅Tag B或订阅Tag B||Tag A。
图1 正确的订阅关系一致设置

订阅关系一致性保证了同一个消费组中消费消息的正常运行,避免消息逻辑混乱和消息丢失。在实际使用中,生产者端要做好消息的分类,便于消费者可以使用Tag进行消息的准确订阅。而在消费者端,则要保证订阅关系的一致性。

错误的订阅关系

  • 同一消费组下的消费者订阅了不同的Topic
    例如,消费组Group1下有消费者A和消费者B,消费者A订阅了Topic A,消费者B订阅了Topic B。当生产者向Topic A发送消息时,消息会按Queue均匀发送给消费者A和消费者B。由于消费者B没有订阅Topic A的消息,会把Topic A消息过滤掉(即图2中Topic A的Queue2中的消息会被消费者B过滤),从而导致部分Topic A消息未被消费。
    图2 错误的Topic订阅
  • 同一消费组下的消费者订阅了相同Topic下不同的Tag
    例如,消费组Group1下有消费者A和消费者B,消费者A订阅了Topic A的Tag A,消费者B订阅了Topic A的Tag B。当生产者A向Topic A发送Tag A的消息时,Tag A消息会按Queue均匀发送给消费者A和消费者B。由于消费者B没有订阅Tag A的消息,会把Tag A消息过滤掉(即图3中Queue2中的Tag A消息会被消费者B过滤),从而导致部分Tag A消息未被消费。
    图3 错误的Tag订阅

实施方法

  • 订阅一个Topic且订阅一个Tag
    同一消费组Group1中的消费者Consumer1、Consumer2和Consumer3都订阅了Topic_A,且都订阅了Topic_A的Tag_A,符合订阅关系一致原则。Consumer1、Consumer2、Consumer3的订阅关系一致,即Consumer1、Consumer2、Consumer3订阅消息的代码必须完全一致,代码示例如下:
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group1");
    consumer.subscribe("Topic_A", "Tag_A");
  • 订阅一个Topic且订阅多个Tag

    同一消费组Group1中的消费者Consumer1、Consumer2和Consumer3都订阅了Topic_A,且都订阅了Topic_A的Tag_A和Tag_B(即订阅Topic_A中所有Tag为Tag_A或Tag_B的消息),顺序都是Tag_A||Tag_B,符合订阅关系一致性原则。Consumer1、Consumer2、Consumer3的订阅关系一致,即Consumer1、Consumer2、Consumer3订阅消息的代码必须完全一致,代码示例如下:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group1");
    consumer.subscribe("Topic_A", "Tag_A||Tag_B");
  • 订阅多个Topic且订阅多个Tag

    同一消费组Group1中的消费者Consumer1、Consumer2和Consumer3都订阅了Topic_A和Topic_B,且订阅Topic_A都未指定Tag(即订阅Topic_A中的所有消息),订阅Topic_B的Tag都是Tag_A和Tag_B(即订阅Topic_B中所有Tag为Tag_A或Tag_B的消息),顺序都是Tag_A||Tag_B,符合订阅关系一致性原则。Consumer1、Consumer2、Consumer3的订阅关系一致,即Consumer1、Consumer2、Consumer3订阅消息的代码必须完全一致,代码示例如下:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group1");
    consumer.subscribe("Topic_A", "*");
    consumer.subscribe("Topic_B", "Tag_A||Tag_B");