文档首页 > > 最佳实践> 利用消息标签实现消息过滤

利用消息标签实现消息过滤

分享
更新时间: 2019/10/29 GMT+08:00

场景介绍

在实际场景中,一个队列存储的消息,可以包含不同实际用途。如果这些消息不加区分,消费者每次消费都会按顺序拉取消息,直到完成对所有消息的消费。

如果消费者只对某一类型的消息感兴趣,那么将所有消息都消费一遍势必影响消费者处理效率。

优化方案

DMS服务提供消息标签的能力,支持生产者为每条消息提供一个或多个标签(tag),消费者则根据标签(tag)的内容来过滤消息,确保每个消费者最终只会消费到它感兴趣的消息类型。

以金融交易场景为例,在一种交易中可能会产生多钟类型的消息,如股票(stock),基金(fund),贷款(loan)等。这些消息会通过交易(business)主题来传递给不同的处理系统,如股票系统,基金系统,贷款系统,实时分析系统等。然而基金系统只关心基金类型的消息,而实时分析系统可能需要获取到所有类型的消息。

在生产消息时,生产者对每条消息加上标签(tag),消费者在拉取消息时决定是否仅获取带有某标签(tag)的消息,从而提高消息消费效率。如图1所示:

图1 增加标签(tag)的消息消费示意图

DMS普通队列与FIFO队列支持消息标签(Tag)功能,Kafka队列不支持。

代码示例

以下仅贴出与消息标签相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考DMS开发指南进行部署和运行。

示例提供了基于Http Restful接口的代码,具体API接口可参考帮助中心分布式消息服务接口参考

消息标签设计示例代码

package com.cloud.dms;

import java.net.URL;
import java.util.Properties;
import com.cloud.dms.access.AccessServiceUtils;

public class DMSHttpClient
{
    private static String endpointUrl = "";

    private static String region = "";

    private static String serviceName = "dms";

    private static String aKey = "";

    private static String sKey = "";

    private static String projectId = "546e52331ea74cd49722fda4fb23bf55";

    private static String queueId = "39cd8dcb-b901-43b4-9ea1-48730e9adc58";

    private static String queueGroupId = "g-ae8ed05f-464c-452c-9e37-d3bdd081000d";

    /*
     * Read Configure File And Initialize Variables
     */
    static
    {
        URL configPath = ClassLoader.getSystemResource("dms-service-config.properties");
        Properties prop = AccessServiceUtils.getPropsFromFile(configPath.getFile());
        region = prop.getProperty(Constants.DMS_SERVICE_REGION);
        aKey = prop.getProperty(Constants.DMS_SERVICE_AK);
        sKey = prop.getProperty(Constants.DMS_SERVICE_SK);
        endpointUrl = prop.getProperty(Constants.DMS_SERVICE_ENDPOINT_URL);
        if (endpointUrl.endsWith("/"))
        {
            endpointUrl = endpointUrl + "v1.0/";
        }
        else
        {
            endpointUrl = endpointUrl + "/v1.0/";
        }
        projectId = prop.getProperty(Constants.DMS_SERVICE_PROJECT_ID);
    }
    
    public static void main(String[] args)
    {
        runAllApiMethods();
    }
    
    public static void runAllApiMethods()
    {
        MsgAttri msg = new MsgAttri();
        msg.setaKey(aKey);
        msg.setEndpointUrl(endpointUrl);
        msg.setProjectId(projectId);
        msg.setQueueId(queueId);
        msg.setsKey(sKey);
        msg.setRegion(region);
        msg.setServiceName(serviceName);
        msg.setMsgLimit("10");
        msg.setGroupId(queueGroupId);
        /**
         * 构造生产者和四种消费者,设置各自感兴趣的tag
         */
        MsgProducer msgProducer = new MsgProducer(msg);
        MsgConsumer stock = new MsgConsumer(msg, "stock");
        MsgConsumer fund = new MsgConsumer(msg, "fund");
        MsgConsumer loan = new MsgConsumer(msg, "loan");
        MsgConsumer all = new MsgConsumer(msg, null);
        /**
         * 创建线程,模拟生产和消费行为,设置线程的名字,便于区分
         */
        Thread producer = new Thread(msgProducer);
        Thread stockThread = new Thread(stock);
        Thread fundThread = new Thread(fund);
        Thread loanThread = new Thread(loan);
        Thread alls = new Thread(all);
        producer.setName("producer");
        stockThread.setName("stock");
        fundThread.setName("fund");
        loanThread.setName("loan");
        alls.setName("Analysis");
        /**
         * 启动线程
         */
        producer.start();
        stockThread.start();
        fundThread.start();
//        loanThread.start();
//        alls.start();
    }
}

生产消息示例

package com.cloud.dms;

import static com.cloud.dms.ApiUtils.constructTempMessages;
import static com.cloud.dms.ApiUtils.sendMessages;

import java.util.concurrent.TimeUnit;

public class MsgProducer implements Runnable{
    private MsgAttri msgAttri;

    public MsgProducer(MsgAttri msg) {
       this.msgAttri = msg;
    }
    
    public void run() {
       while (true)
       {
           /**
           * 模拟生产者,构造消息,JSON中包含stock fund loan三种消息
           **/
           String messages = constructTempMessages(null);
           sendMessages(messages, this.msgAttri);
           try
           {
              TimeUnit.SECONDS.sleep(1);
           }
           catch (InterruptedException e) {
           }
       }
    }
}

消费消息示例代码

 
package com.cloud.dms;

import static com.cloud.dms.ApiUtils.acknowledgeMessages;
import static com.cloud.dms.ApiUtils.consumeMessages;
import static com.cloud.dms.ApiUtils.parseHandlerIds;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MsgConsumer implements Runnable{
    
    private MsgAttri msgAttri;
    
    private String tag;

    /**
    *构造函数,获取到消费者感兴趣的tag
    **/
    public MsgConsumer(MsgAttri msg, String tag) {
       this.tag = tag;
       this.msgAttri = msg;
    }
    
    public void run() {
       while (true)
       {
           /**
           *消费消息,获取该tag下的消息
           **/
           ResponseMessage consumeMessagesResMsg = consumeMessages(msgAttri, tag);
           /**
           *解析消息
           **/
           if (consumeMessagesResMsg.getStatusCode() == 200)
           {
              List<String> msgStrings = ApiUtils.decodeMsg(consumeMessagesResMsg);
              /**
              *模拟处理消息,打印该tag下的消息
              **/
              for (String s : msgStrings)
              {
                  System.out.println("Thread--"+ Thread.currentThread().getName() + "--Message Body is: "+ s);
              }
              /**
              * 消费确认
              **/
              ArrayList<String> handlerIds = parseHandlerIds(consumeMessagesResMsg);
              if (handlerIds.size() > 0)
              {
                  acknowledgeMessages(handlerIds, msgAttri);
              }
           }
           else
           {
              System.out.println("Http Response Code is: "
                     + consumeMessagesResMsg.getStatusCode() + "\n Http Body is: "
                     + consumeMessagesResMsg.getBody());
              
           }
           try
           {
              TimeUnit.SECONDS.sleep(2);
           } 
           catch (InterruptedException e) 
           {
           }
       }
    }
}

示例运行结果如下

Loan线程由于指定了tag(loan),因此只能消费到loan标签的消息:

Fund和stock线程也同理只能消费到各自指定的消息:

而Analysi线程没有指定Tag,可以消费到Topic里的所有消息:

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区