通过Topic和Tag实现消息分类
方案概述
Topic是消息关联的基础逻辑单元,消息的生产与消费围绕着Topic进行。每个Topic包含若干条消息,每条消息只能属于一个Topic。
Tag是消息的标签,用于在同一Topic下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一Topic下设置不同的标签。标签能够有效保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同Topic的不同消费逻辑,实现更好的扩展性。
Topic是消息的一级分类,Tag是消息的二级分类,关系如下图。
应用场景
在实际业务中,通过合理使用Topic和Tag,可以使业务结构更清晰,提高效率。可以根据如下几方面判断Topic和Tag的具体使用场景:
- 根据消息类型判断:RocketMQ的消息可分为普通消息、顺序消息、定时/延迟消息、事务消息,不同类型的消息需要用不同的Topic来区分,无法通过Tag区分。
- 根据消息优先级判断:在业务中紧急程度高和紧急程度一般的消息用不同的Topic来区分,方便后续业务处理。
- 根据业务关联性判断:业务逻辑上不相关的消息用不同Topic来区分,业务逻辑上强关联的消息发送到同一Topic下,并用Tag进行子类型或流程先后关系的区分。
实施方法
以物流运输场景为例,普通货物订单消息和生鲜货物订单消息属于不同类型的消息,分别创建Topic_Common和Topic_Fresh。在不同消息类型中,以不同的Tag划分不同的订单目的省份。
- Topic:Topic_Common
- Tag = Province_A
- Tag = Province_B
- Topic: Topic_Fresh
- Tag = Province_A
- Tag = Province_B
以生产发往Province A的普通货物订单消息为例,代码示例如下:
Message msg = new Message("Topic_Common", "Province_A" /* Tag */, ("Order_id " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
以订阅发往Province A和Province B的生鲜货物订单消息为例,代码示例如下:
consumer.subscribe("Topic_Fresh", "Province_A || Province_B");
不同消费者消费不同Tag
在实际使用场景中,可能会遇到不同的消费者消费同一Topic的不同Tag消息。对于同一个Topic的不同Tag,如果RocketMQ消费者设置了相同的消费组,会导致消息消费混乱。
例如Topic A下有Tag A和Tag B,消费者A订阅了Tag A的消息,消费者B订阅了Tag B的消息。
如果消费者A和消费者B设置了相同的消费组,当生产者发送Tag A的消息时,Tag A的消息会均匀发送给消费者A和消费者B。由于消费者B没有订阅Tag A的消息,会把Tag A消息过滤掉,从而导致部分Tag A消息未被消费。
这种情况下,把消费者A和消费者B设置不同的消费组,即可解决消费混乱的问题。