分布式消息服务 DMS分布式消息服务 DMS

计算
弹性云服务器 ECS
云耀云服务器 HECS
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机 CPH
VR云渲游平台 CVR
特惠算力专区
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
内容分发网络 CDN
存储容灾服务 SDRS
弹性文件服务 SFS
云服务器备份 CSBS
云硬盘备份 VBS
数据快递服务 DES
专属企业存储服务
智能边缘
智能边缘云 IEC
EI 企业智能
EI安视服务
AI开发平台ModelArts
数据湖治理中心 DGC
数据仓库服务 GaussDB(DWS)
企业级AI应用开发专业套件 ModelArts Pro
数据湖探索 DLI
华为HiLens
云搜索服务 CSS
数据接入服务 DIS
表格存储服务 CloudTable
数据湖工厂 DLF
图引擎服务 GES
推荐系统 RES
文字识别 OCR
内容审核 Moderation
图像识别 Image
图像搜索 ImageSearch
人脸识别服务 FRS
对话机器人服务 CBS
视频分析服务 VAS
数据可视化 DLV
视频接入服务 VIS
自然语言处理 NLP
语音交互服务 SIS
知识图谱 KG
医疗智能体 EIHealth
可信智能计算服务 TICS
园区智能体 CampusGo
实时流计算服务 CS
人证核身服务 IVS
IoT物联网
设备接入 IoTDA
IoT物联网
全球SIM联接 GSL
设备发放 IoTDP
IoT开发者服务
IoT边缘 IoTEdge
IoT数据分析
路网数字化服务 DRIS
开发与运维
项目管理 ProjectMan
代码托管 CodeHub
流水线 CloudPipeline
代码检查 CodeCheck
编译构建 CloudBuild
部署 CloudDeploy
云测 CloudTest
发布 CloudRelease
移动应用测试 MobileAPPTest
CloudIDE
Classroom
软件开发平台 DevCloud
开源镜像站 Mirrors
视频
媒体处理 MPC
视频点播 VOD
视频直播 Live
实时音视频 SparkRTC
管理与部署
统一身份认证服务 IAM
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云审计服务 CTS
云日志服务 LTS
标签管理服务 TMS
资源管理服务 RMS
应用身份管理服务 OneAccess
专属云
专属计算集群 DCC
专属分布式存储服务 DSS
域名与网站
域名注册服务 Domains
云速建站 CloudSite
企业协同
华为云WeLink
会议
ISDP
解决方案
全栈专属服务
高性能计算 HPC
SAP
游戏云
混合云灾备
快视频
华为工业云平台 IMC
价格
价格原则
成本优化最佳实践
昇腾
昇腾MindX SDK (20.3)
其他
管理控制台
消息中心
产品价格详情
系统权限
我的凭证
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
应用编排服务 AOS
容器交付流水线 ContainerOps
应用服务网格 ASM
多云容器平台 MCP
基因容器 GCS
容器洞察引擎 CIE
容器批量计算 BCE
云原生服务中心 OSC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
数据库
云数据库 RDS
文档数据库服务 DDS
分布式数据库中间件 DDM
数据复制服务 DRS
数据管理服务 DAS
云数据库 GaussDB(for MySQL)
云数据库 GaussDB NoSQL
云数据库 GaussDB (for openGauss)
数据库和应用迁移 UGO
大数据
MapReduce服务 MRS
应用中间件
应用管理与运维平台 ServiceStage
分布式缓存服务 DCS
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
消息通知服务 SMN
微服务引擎 CSE
云性能测试服务 CPTS
区块链服务 BCS
API网关 APIG
应用魔方 AppCube
分布式消息服务RocketMQ版
多云高可用服务 MAS
可信跨链数据链接服务 TCDAS
企业应用
云桌面 Workspace
云解析服务 DNS
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMAExchange
ROMA API
鸿源云道
华为乾坤云服务
安全与合规
Web应用防火墙 WAF
漏洞扫描服务 VSS
企业主机安全 HSS
容器安全服务 CGS
数据加密服务 DEW
数据库安全服务 DBSS
态势感知 SA
云堡垒机 CBH
SSL证书管理 SCM
云证书管理服务 CCM
管理检测与响应 MDR
数据安全中心 DSC
威胁检测服务 MTD
DDoS防护 ADS
云防火墙 CFW
应用信任中心 ATC
安全技术与应用
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
智能协作
IdeaHub
企业网络
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
废弃-华为乾坤安全云服务
云通信
语音通话 VoiceCall
消息&短信 MSGSMS
隐私保护通话 PrivateNumber
开发者工具
SDK开发指南
API签名指南
DevStar
HCloud CLI
Terraform
Ansible
云生态
云市场
鲲鹏
昇腾
合作伙伴中心
华为云学院
用户服务
帐号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
网站备案
支持计划
专业服务
合作伙伴支持计划
更新时间:2021/06/16 GMT+08:00
分享

利用消息标签实现消息过滤

场景介绍

在实际场景中,一个队列存储的消息,可以包含不同实际用途。如果这些消息不加区分,消费者每次消费都会按顺序拉取消息,直到完成对所有消息的消费。

如果消费者只对某一类型的消息感兴趣,那么将所有消息都消费一遍势必影响消费者处理效率。

优化方案

DMS服务提供消息标签的能力,支持生产者为每条消息提供一个或多个标签(tag),消费者则根据标签(tag)的内容来过滤消息,确保每个消费者最终只会消费到它感兴趣的消息类型。

以金融交易场景为例,在一种交易中可能会产生多钟类型的消息,如股票(stock),基金(fund),贷款(loan)等。这些消息会通过交易(business)主题来传递给不同的处理系统,如股票系统,基金系统,贷款系统,实时分析系统等。然而基金系统只关心基金类型的消息,而实时分析系统可能需要获取到所有类型的消息。

在生产消息时,生产者对每条消息加上标签(tag),消费者在拉取消息时决定是否仅获取带有某标签(tag)的消息,从而提高消息消费效率。如图1所示:

图1 增加标签(tag)的消息消费示意图

DMS普通队列与FIFO队列支持消息标签(Tag)功能,Kafka队列不支持。

代码示例

以下仅贴出与消息标签相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考DMS开发指南进行部署和运行。

示例提供了基于Http Restful接口的代码,具体API接口可参考帮助中心分布式消息服务接口参考

消息标签设计示例代码

package com.cloud.dms;

import java.net.URL;
import java.util.Properties;
import com.cloud.dms.access.AccessServiceUtils;

public class DMSHttpClient
{
    private static String endpointUrl = "";

    private static String region = "";

    private static String serviceName = "dms";

    private static String aKey = "";

    private static String sKey = "";

    private static String projectId = "546e52331ea74cd49722fda4fb23bf55";

    private static String queueId = "39cd8dcb-b901-43b4-9ea1-48730e9adc58";

    private static String queueGroupId = "g-ae8ed05f-464c-452c-9e37-d3bdd081000d";

    /*
     * Read Configure File And Initialize Variables
     */
    static
    {
        URL configPath = ClassLoader.getSystemResource("dms-service-config.properties");
        Properties prop = AccessServiceUtils.getPropsFromFile(configPath.getFile());
        region = prop.getProperty(Constants.DMS_SERVICE_REGION);
        aKey = prop.getProperty(Constants.DMS_SERVICE_AK);
        sKey = prop.getProperty(Constants.DMS_SERVICE_SK);
        endpointUrl = prop.getProperty(Constants.DMS_SERVICE_ENDPOINT_URL);
        if (endpointUrl.endsWith("/"))
        {
            endpointUrl = endpointUrl + "v1.0/";
        }
        else
        {
            endpointUrl = endpointUrl + "/v1.0/";
        }
        projectId = prop.getProperty(Constants.DMS_SERVICE_PROJECT_ID);
    }
    
    public static void main(String[] args)
    {
        runAllApiMethods();
    }
    
    public static void runAllApiMethods()
    {
        MsgAttri msg = new MsgAttri();
        msg.setaKey(aKey);
        msg.setEndpointUrl(endpointUrl);
        msg.setProjectId(projectId);
        msg.setQueueId(queueId);
        msg.setsKey(sKey);
        msg.setRegion(region);
        msg.setServiceName(serviceName);
        msg.setMsgLimit("10");
        msg.setGroupId(queueGroupId);
        /**
         * 构造生产者和四种消费者,设置各自感兴趣的tag
         */
        MsgProducer msgProducer = new MsgProducer(msg);
        MsgConsumer stock = new MsgConsumer(msg, "stock");
        MsgConsumer fund = new MsgConsumer(msg, "fund");
        MsgConsumer loan = new MsgConsumer(msg, "loan");
        MsgConsumer all = new MsgConsumer(msg, null);
        /**
         * 创建线程,模拟生产和消费行为,设置线程的名字,便于区分
         */
        Thread producer = new Thread(msgProducer);
        Thread stockThread = new Thread(stock);
        Thread fundThread = new Thread(fund);
        Thread loanThread = new Thread(loan);
        Thread alls = new Thread(all);
        producer.setName("producer");
        stockThread.setName("stock");
        fundThread.setName("fund");
        loanThread.setName("loan");
        alls.setName("Analysis");
        /**
         * 启动线程
         */
        producer.start();
        stockThread.start();
        fundThread.start();
//        loanThread.start();
//        alls.start();
    }
}

生产消息示例

package com.cloud.dms;

import static com.cloud.dms.ApiUtils.constructTempMessages;
import static com.cloud.dms.ApiUtils.sendMessages;

import java.util.concurrent.TimeUnit;

public class MsgProducer implements Runnable{
    private MsgAttri msgAttri;

    public MsgProducer(MsgAttri msg) {
       this.msgAttri = msg;
    }
    
    public void run() {
       while (true)
       {
           /**
           * 模拟生产者,构造消息,JSON中包含stock, fund, loan三种消息
           **/
           String messages = constructTempMessages(null);
           sendMessages(messages, this.msgAttri);
           try
           {
              TimeUnit.SECONDS.sleep(1);
           }
           catch (InterruptedException e) {
           }
       }
    }
}

消费消息示例代码

 
package com.cloud.dms;

import static com.cloud.dms.ApiUtils.acknowledgeMessages;
import static com.cloud.dms.ApiUtils.consumeMessages;
import static com.cloud.dms.ApiUtils.parseHandlerIds;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MsgConsumer implements Runnable{
    
    private MsgAttri msgAttri;
    
    private String tag;

    /**
    *构造函数,获取到消费者感兴趣的tag
    **/
    public MsgConsumer(MsgAttri msg, String tag) {
       this.tag = tag;
       this.msgAttri = msg;
    }
    
    public void run() {
       while (true)
       {
           /**
           *消费消息,获取该tag下的消息
           **/
           ResponseMessage consumeMessagesResMsg = consumeMessages(msgAttri, tag);
           /**
           *解析消息
           **/
           if (consumeMessagesResMsg.getStatusCode() == 200)
           {
              List<String> msgStrings = ApiUtils.decodeMsg(consumeMessagesResMsg);
              /**
              *模拟处理消息,打印该tag下的消息
              **/
              for (String s : msgStrings)
              {
                  System.out.println("Thread--"+ Thread.currentThread().getName() + "--Message Body is: "+ s);
              }
              /**
              * 消费确认
              **/
              ArrayList<String> handlerIds = parseHandlerIds(consumeMessagesResMsg);
              if (handlerIds.size() > 0)
              {
                  acknowledgeMessages(handlerIds, msgAttri);
              }
           }
           else
           {
              System.out.println("Http Response Code is: "
                     + consumeMessagesResMsg.getStatusCode() + "\n Http Body is: "
                     + consumeMessagesResMsg.getBody());
              
           }
           try
           {
              TimeUnit.SECONDS.sleep(2);
           } 
           catch (InterruptedException e) 
           {
           }
       }
    }
}

示例运行结果如下

Loan线程由于指定了tag(loan),因此只能消费到loan标签的消息:

Fund和stock线程也同理只能消费到各自指定的消息:

而Analysi线程没有指定Tag,可以消费到Topic里的所有消息:

分享:

    相关文档

    相关产品