利用消息标签实现消息过滤
场景介绍
在实际场景中,一个队列存储的消息,可以包含不同实际用途。如果这些消息不加区分,消费者每次消费都会按顺序拉取消息,直到完成对所有消息的消费。
如果消费者只对某一类型的消息感兴趣,那么将所有消息都消费一遍势必影响消费者处理效率。
优化方案
DMS服务提供消息标签的能力,支持生产者为每条消息提供一个或多个标签(tag),消费者则根据标签(tag)的内容来过滤消息,确保每个消费者最终只会消费到它感兴趣的消息类型。
以金融交易场景为例,在一种交易中可能会产生多钟类型的消息,如股票(stock),基金(fund),贷款(loan)等。这些消息会通过交易(business)主题来传递给不同的处理系统,如股票系统,基金系统,贷款系统,实时分析系统等。然而基金系统只关心基金类型的消息,而实时分析系统可能需要获取到所有类型的消息。
在生产消息时,生产者对每条消息加上标签(tag),消费者在拉取消息时决定是否仅获取带有某标签(tag)的消息,从而提高消息消费效率。如图1所示:
DMS普通队列与FIFO队列支持消息标签(Tag)功能,Kafka队列不支持。
代码示例
以下仅贴出与消息标签相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考DMS开发指南进行部署和运行。
示例提供了基于Http Restful接口的代码,具体API接口可参考帮助中心分布式消息服务接口参考。
消息标签设计示例代码
package com.cloud.dms;
import java.net.URL;
import java.util.Properties;
import com.cloud.dms.access.AccessServiceUtils;
public class DMSHttpClient
{
private static String endpointUrl = "";
private static String region = "";
private static String serviceName = "dms";
private static String aKey = "";
private static String sKey = "";
private static String projectId = "546e52331ea74cd49722fda4fb23bf55";
private static String queueId = "39cd8dcb-b901-43b4-9ea1-48730e9adc58";
private static String queueGroupId = "g-ae8ed05f-464c-452c-9e37-d3bdd081000d";
/*
* Read Configure File And Initialize Variables
*/
static
{
URL configPath = ClassLoader.getSystemResource("dms-service-config.properties");
Properties prop = AccessServiceUtils.getPropsFromFile(configPath.getFile());
region = prop.getProperty(Constants.DMS_SERVICE_REGION);
aKey = prop.getProperty(Constants.DMS_SERVICE_AK);
sKey = prop.getProperty(Constants.DMS_SERVICE_SK);
endpointUrl = prop.getProperty(Constants.DMS_SERVICE_ENDPOINT_URL);
if (endpointUrl.endsWith("/"))
{
endpointUrl = endpointUrl + "v1.0/";
}
else
{
endpointUrl = endpointUrl + "/v1.0/";
}
projectId = prop.getProperty(Constants.DMS_SERVICE_PROJECT_ID);
}
public static void main(String[] args)
{
runAllApiMethods();
}
public static void runAllApiMethods()
{
MsgAttri msg = new MsgAttri();
msg.setaKey(aKey);
msg.setEndpointUrl(endpointUrl);
msg.setProjectId(projectId);
msg.setQueueId(queueId);
msg.setsKey(sKey);
msg.setRegion(region);
msg.setServiceName(serviceName);
msg.setMsgLimit("10");
msg.setGroupId(queueGroupId);
/**
* 构造生产者和四种消费者,设置各自感兴趣的tag
*/
MsgProducer msgProducer = new MsgProducer(msg);
MsgConsumer stock = new MsgConsumer(msg, "stock");
MsgConsumer fund = new MsgConsumer(msg, "fund");
MsgConsumer loan = new MsgConsumer(msg, "loan");
MsgConsumer all = new MsgConsumer(msg, null);
/**
* 创建线程,模拟生产和消费行为,设置线程的名字,便于区分
*/
Thread producer = new Thread(msgProducer);
Thread stockThread = new Thread(stock);
Thread fundThread = new Thread(fund);
Thread loanThread = new Thread(loan);
Thread alls = new Thread(all);
producer.setName("producer");
stockThread.setName("stock");
fundThread.setName("fund");
loanThread.setName("loan");
alls.setName("Analysis");
/**
* 启动线程
*/
producer.start();
stockThread.start();
fundThread.start();
// loanThread.start();
// alls.start();
}
}
生产消息示例
package com.cloud.dms;
import static com.cloud.dms.ApiUtils.constructTempMessages;
import static com.cloud.dms.ApiUtils.sendMessages;
import java.util.concurrent.TimeUnit;
public class MsgProducer implements Runnable{
private MsgAttri msgAttri;
public MsgProducer(MsgAttri msg) {
this.msgAttri = msg;
}
public void run() {
while (true)
{
/**
* 模拟生产者,构造消息,JSON中包含stock, fund, loan三种消息
**/
String messages = constructTempMessages(null);
sendMessages(messages, this.msgAttri);
try
{
TimeUnit.SECONDS.sleep(1);
}
catch (InterruptedException e) {
}
}
}
}
消费消息示例代码
package com.cloud.dms;
import static com.cloud.dms.ApiUtils.acknowledgeMessages;
import static com.cloud.dms.ApiUtils.consumeMessages;
import static com.cloud.dms.ApiUtils.parseHandlerIds;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MsgConsumer implements Runnable{
private MsgAttri msgAttri;
private String tag;
/**
*构造函数,获取到消费者感兴趣的tag
**/
public MsgConsumer(MsgAttri msg, String tag) {
this.tag = tag;
this.msgAttri = msg;
}
public void run() {
while (true)
{
/**
*消费消息,获取该tag下的消息
**/
ResponseMessage consumeMessagesResMsg = consumeMessages(msgAttri, tag);
/**
*解析消息
**/
if (consumeMessagesResMsg.getStatusCode() == 200)
{
List<String> msgStrings = ApiUtils.decodeMsg(consumeMessagesResMsg);
/**
*模拟处理消息,打印该tag下的消息
**/
for (String s : msgStrings)
{
System.out.println("Thread--"+ Thread.currentThread().getName() + "--Message Body is: "+ s);
}
/**
* 消费确认
**/
ArrayList<String> handlerIds = parseHandlerIds(consumeMessagesResMsg);
if (handlerIds.size() > 0)
{
acknowledgeMessages(handlerIds, msgAttri);
}
}
else
{
System.out.println("Http Response Code is: "
+ consumeMessagesResMsg.getStatusCode() + "\n Http Body is: "
+ consumeMessagesResMsg.getBody());
}
try
{
TimeUnit.SECONDS.sleep(2);
}
catch (InterruptedException e)
{
}
}
}
}
示例运行结果如下
Loan线程由于指定了tag(loan),因此只能消费到loan标签的消息:

Fund和stock线程也同理只能消费到各自指定的消息:

而Analysi线程没有指定Tag,可以消费到Topic里的所有消息:

