分布式消息服务 DMS分布式消息服务 DMS

计算
弹性云服务器 ECS
云耀云服务器 HECS
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机 CPH
VR云渲游平台 CVR
特惠算力专区
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
内容分发网络 CDN
存储容灾服务 SDRS
弹性文件服务 SFS
云服务器备份 CSBS
云硬盘备份 VBS
数据快递服务 DES
专属企业存储服务
智能边缘
智能边缘云 IEC
EI 企业智能
EI安视服务
AI开发平台ModelArts
数据湖治理中心 DGC
数据仓库服务 GaussDB(DWS)
企业级AI应用开发专业套件 ModelArts Pro
数据湖探索 DLI
华为HiLens
云搜索服务 CSS
数据接入服务 DIS
表格存储服务 CloudTable
数据湖工厂 DLF
图引擎服务 GES
推荐系统 RES
文字识别 OCR
内容审核 Moderation
图像识别 Image
图像搜索 ImageSearch
人脸识别服务 FRS
对话机器人服务 CBS
视频分析服务 VAS
数据可视化 DLV
视频接入服务 VIS
自然语言处理 NLP
语音交互服务 SIS
知识图谱 KG
医疗智能体 EIHealth
可信智能计算服务 TICS
园区智能体 CampusGo
实时流计算服务 CS
人证核身服务 IVS
IoT物联网
设备接入 IoTDA
IoT物联网
全球SIM联接 GSL
设备发放 IoTDP
IoT开发者服务
IoT边缘 IoTEdge
IoT数据分析
路网数字化服务 DRIS
开发与运维
项目管理 ProjectMan
代码托管 CodeHub
流水线 CloudPipeline
代码检查 CodeCheck
编译构建 CloudBuild
部署 CloudDeploy
云测 CloudTest
发布 CloudRelease
移动应用测试 MobileAPPTest
CloudIDE
Classroom
软件开发平台 DevCloud
开源镜像站 Mirrors
视频
媒体处理 MPC
视频点播 VOD
视频直播 Live
实时音视频 SparkRTC
管理与部署
统一身份认证服务 IAM
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云审计服务 CTS
云日志服务 LTS
标签管理服务 TMS
资源管理服务 RMS
应用身份管理服务 OneAccess
专属云
专属计算集群 DCC
专属分布式存储服务 DSS
域名与网站
域名注册服务 Domains
云速建站 CloudSite
企业协同
华为云WeLink
会议
ISDP
解决方案
全栈专属服务
高性能计算 HPC
SAP
游戏云
混合云灾备
快视频
华为工业云平台 IMC
价格
价格原则
成本优化最佳实践
昇腾
昇腾MindX SDK (20.3)
其他
管理控制台
消息中心
产品价格详情
系统权限
我的凭证
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
应用编排服务 AOS
容器交付流水线 ContainerOps
应用服务网格 ASM
多云容器平台 MCP
基因容器 GCS
容器洞察引擎 CIE
容器批量计算 BCE
云原生服务中心 OSC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
数据库
云数据库 RDS
文档数据库服务 DDS
分布式数据库中间件 DDM
数据复制服务 DRS
数据管理服务 DAS
云数据库 GaussDB(for MySQL)
云数据库 GaussDB NoSQL
云数据库 GaussDB (for openGauss)
数据库和应用迁移 UGO
大数据
MapReduce服务 MRS
应用中间件
应用管理与运维平台 ServiceStage
分布式缓存服务 DCS
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
消息通知服务 SMN
微服务引擎 CSE
云性能测试服务 CPTS
区块链服务 BCS
API网关 APIG
应用魔方 AppCube
分布式消息服务RocketMQ版
多云高可用服务 MAS
可信跨链数据链接服务 TCDAS
企业应用
云桌面 Workspace
云解析服务 DNS
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMAExchange
ROMA API
鸿源云道
华为乾坤云服务
安全与合规
Web应用防火墙 WAF
漏洞扫描服务 VSS
企业主机安全 HSS
容器安全服务 CGS
数据加密服务 DEW
数据库安全服务 DBSS
态势感知 SA
云堡垒机 CBH
SSL证书管理 SCM
云证书管理服务 CCM
管理检测与响应 MDR
数据安全中心 DSC
威胁检测服务 MTD
DDoS防护 ADS
云防火墙 CFW
应用信任中心 ATC
安全技术与应用
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
智能协作
IdeaHub
企业网络
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
废弃-华为乾坤安全云服务
云通信
语音通话 VoiceCall
消息&短信 MSGSMS
隐私保护通话 PrivateNumber
开发者工具
SDK开发指南
API签名指南
DevStar
HCloud CLI
Terraform
Ansible
云生态
云市场
鲲鹏
昇腾
合作伙伴中心
华为云学院
用户服务
帐号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
网站备案
支持计划
专业服务
合作伙伴支持计划
更新时间:2021/06/16 GMT+08:00
分享

DMS Kafka消费者poll的优化

场景介绍

在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。

图1所示,Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。

图1 Kafka消费者多线程消费模式

优化方案

在开了多个线程同时访问的情况下,如果队列里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现队列中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图2所示。

这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。

图2 优化后的多线程消费方案

消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。

因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。

代码示例

以下仅贴出与消费者线程唤醒与睡眠相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考DMS开发指南进行部署和运行。

消费消息代码示例如下:

package com.huawei.dms.kafka;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;

public class DmsKafkaConsumeDemo
{
    private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);

    public static void WorkerFunc(int workerId, KafkaConsumer<String, String> kafkaConsumer) throws IOException
    {
        Properties consumerConfig = Config.getConsumerConfig();
        RecordReceiver receiver = new RecordReceiver(workerId, kafkaConsumer, consumerConfig.getProperty("topic"));
        while (true)
        {
            ConsumerRecords<String, String> records = receiver.receiveMessage();
            Iterator<ConsumerRecord<String, String>> iter = records.iterator();
            while (iter.hasNext())
            {
                ConsumerRecord<String, String> cr = iter.next();
                System.out.println("Thread" + workerId + " recievedrecords" + cr.value());
                logger.info("Thread" + workerId + " recievedrecords" + cr.value());

            }

        }
    }

    public static KafkaConsumer<String, String> getConsumer() throws IOException
    {
        Properties consumerConfig = Config.getConsumerConfig();

        consumerConfig.put("ssl.truststore.location", Config.getTrustStorePath());
        System.setProperty("java.security.auth.login.config", Config.getSaslConfig());

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
        kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getProperty("topic")),
                new ConsumerRebalanceListener()
                {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> arg0)
                    {

                    }

                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> tps)
                    {

                    }
                });
        return kafkaConsumer;
    }

    public static void main(String[] args) throws IOException
    {

        //创建当前消费组的consumer
        final KafkaConsumer<String, String> consumer1 = getConsumer();
        Thread thread1 = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    WorkerFunc(1, consumer1);
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        final KafkaConsumer<String, String> consumer2 = getConsumer();

        Thread thread2 = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    WorkerFunc(2, consumer2);
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        final KafkaConsumer<String, String> consumer3 = getConsumer();

        Thread thread3 = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    WorkerFunc(3, consumer3);
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        //启动线程
        thread1.start();
        thread2.start();
        thread3.start();

        try
        {
            Thread.sleep(5000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        //线程加入
        try
        {
            thread1.join();
            thread2.join();
            thread3.join();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

消费者线程管理示例代码

示例仅提供简单的设计思路,开发者可结合实际场景优化线程休眠和唤醒机制。

package com.huawei.dms.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.log4j.Logger;

public class RecordReceiver
{
    private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
    
    //polling的间隔时间
    public static final int WAIT_SECONDS = 10 * 1000;

    protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();

    protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

    protected Object lockObj;

    protected String topicName;

    protected KafkaConsumer<String, String> kafkaConsumer;

    protected int workerId;

    public RecordReceiver(int id, KafkaConsumer<String, String> kafkaConsumer, String queue)
    {
        this.kafkaConsumer = kafkaConsumer;
        this.topicName = queue;
        this.workerId = id;

        synchronized (sLockObjMap)
        {
            lockObj = sLockObjMap.get(topicName);
            if (lockObj == null)
            {
                lockObj = new Object();
                sLockObjMap.put(topicName, lockObj);
            }
        }
    }

    public boolean setPolling()
    {
        synchronized (lockObj)
        {
            Boolean ret = sPollingMap.get(topicName);
            if (ret == null || !ret)
            {
                sPollingMap.put(topicName, true);
                return true;
            }
            return false;
        }
    }

    //唤醒全部线程
    public void clearPolling()
    {
        synchronized (lockObj)
        {
            sPollingMap.put(topicName, false);
            lockObj.notifyAll();
            System.out.println("Everyone WakeUp and Work!");
            logger.info("Everyone WakeUp and Work!");
        }
    }

    public ConsumerRecords<String, String> receiveMessage()
    {
        boolean polling = false;
        while (true)
        {
            //检查线程的poll状态,必要时休眠
            synchronized (lockObj)
            {
                Boolean p = sPollingMap.get(topicName);
                if (p != null && p)
                {
                    try
                    {
                        System.out.println("Thread" + workerId + " Have a nice sleep!");
                        logger.info("Thread" + workerId +" Have a nice sleep!");
                        polling = false;
                        lockObj.wait();
                    }
                    catch (InterruptedException e)
                    {
                        System.out.println("MessageReceiver Interrupted! topicName is " + topicName);
                        logger.error("MessageReceiver Interrupted! topicName is "+topicName);

                        return null;
                    }
                }
            }

            //开始消费,必要时唤醒其他线程消费
            try
            {
                ConsumerRecords<String, String> Records = null;
                if (!polling)
                {
                    Records = kafkaConsumer.poll(100);                    
                    if (Records.count() == 0)
                    {
                        polling = true;
                        continue;
                    }
                }
                else
                {
                    if (setPolling())
                    {
                        System.out.println("Thread" + workerId + " Polling!");
                        logger.info("Thread " + workerId + " Polling!");
                    }
                    else
                    {
                        continue;
                    }
                    do
                    {
                        System.out.println("Thread" + workerId + " KEEP Poll records!");
                        logger.info("Thread" + workerId + " KEEP Poll records!");
                        try
                        {
                            Records = kafkaConsumer.poll(WAIT_SECONDS);
                        }
                        catch (Exception e)
                        {
                            System.out.println("Exception Happened when polling records: " + e);
                            logger.error("Exception Happened when polling records: " + e);

                        }
                    } while (Records.count()==0);
                    clearPolling();
                }
                //消息确认
                kafkaConsumer.commitSync();
                return Records;
            }
            catch (Exception e)
            {
                System.out.println("Exception Happened when poll records: " + e);
                logger.error("Exception Happened when poll records: " + e);
            }
        }
    }
}

topicName配置为“队列名称”或者“Kafka Topic”。

示例代码运行结果

[2018-01-25 22:40:51,841] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:51,841] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:52,122] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,216] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,325] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:52,325] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:54,947] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:40:54,979] INFO Thread3 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:32,347] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:42,353] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,816] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:47,847] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:47,925] INFO Thread 3 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:47,925] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:47,925] INFO Thread3 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,957] INFO Thread2 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:48,472] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,503] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,518] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,550] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,597] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,659] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,659] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:48,675] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,675] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,706] INFO Thread 1 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,706] INFO Thread1 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
分享:

    相关文档

    相关产品