文档首页/ 分布式消息服务Kafka版/ 最佳实践/ 优化消费者轮询(Polling)
更新时间:2024-10-16 GMT+08:00

优化消费者轮询(Polling)

方案概述

应用场景

在分布式消息服务Kafka版提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。

图1所示,Topic含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当Topic中消息较少或者没有消息时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。

图1 Kafka消费者多线程消费模式

解决方案

在开了多个线程同时访问的情况下,如果Topic里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现Topic中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图2所示。

这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。

图2 优化后的多线程消费方案

消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。

因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。

代码示例

以下仅贴出与消费者线程唤醒与睡眠相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考开发指南进行部署和运行。

  • 消费消息代码示例:
    package com.huawei.dms.kafka;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.log4j.Logger;
    
    public class DmsKafkaConsumeDemo
    {
        private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
    
        public static void WorkerFunc(int workerId, KafkaConsumer<String, String> kafkaConsumer) throws IOException
        {
            Properties consumerConfig = Config.getConsumerConfig();
            RecordReceiver receiver = new RecordReceiver(workerId, kafkaConsumer, consumerConfig.getProperty("topic"));
            while (true)
            {
                ConsumerRecords<String, String> records = receiver.receiveMessage();
                Iterator<ConsumerRecord<String, String>> iter = records.iterator();
                while (iter.hasNext())
                {
                    ConsumerRecord<String, String> cr = iter.next();
                    System.out.println("Thread" + workerId + " recievedrecords" + cr.value());
                    logger.info("Thread" + workerId + " recievedrecords" + cr.value());
    
                }
    
            }
        }
    
        public static KafkaConsumer<String, String> getConsumer() throws IOException
        {
            Properties consumerConfig = Config.getConsumerConfig();
    
            consumerConfig.put("ssl.truststore.location", Config.getTrustStorePath());
            System.setProperty("java.security.auth.login.config", Config.getSaslConfig());
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
            kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getProperty("topic")),
                    new ConsumerRebalanceListener()
                    {
                        @Override
                        public void onPartitionsRevoked(Collection<TopicPartition> arg0)
                        {
    
                        }
    
                        @Override
                        public void onPartitionsAssigned(Collection<TopicPartition> tps)
                        {
    
                        }
                    });
            return kafkaConsumer;
        }
    
        public static void main(String[] args) throws IOException
        {
    
            //创建当前消费组的consumer
            final KafkaConsumer<String, String> consumer1 = getConsumer();
            Thread thread1 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(1, consumer1);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            final KafkaConsumer<String, String> consumer2 = getConsumer();
    
            Thread thread2 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(2, consumer2);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            final KafkaConsumer<String, String> consumer3 = getConsumer();
    
            Thread thread3 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(3, consumer3);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
    
            //启动线程
            thread1.start();
            thread2.start();
            thread3.start();
    
            try
            {
                Thread.sleep(5000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            //线程加入
            try
            {
                thread1.join();
                thread2.join();
                thread3.join();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
  • 消费者线程管理代码示例

    示例仅提供简单的设计思路,开发者可结合实际场景优化线程休眠和唤醒机制。

    topicName配置为Topic名称。

    package com.huawei.dms.kafka;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import org.apache.log4j.Logger;
    
    public class RecordReceiver
    {
        private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
        
        //polling的间隔时间
        public static final int WAIT_SECONDS = 10 * 1000;
    
        protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
    
        protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
    
        protected Object lockObj;
    
        protected String topicName;
    
        protected KafkaConsumer<String, String> kafkaConsumer;
    
        protected int workerId;
    
        public RecordReceiver(int id, KafkaConsumer<String, String> kafkaConsumer, String queue)
        {
            this.kafkaConsumer = kafkaConsumer;
            this.topicName = queue;
            this.workerId = id;
    
            synchronized (sLockObjMap)
            {
                lockObj = sLockObjMap.get(topicName);
                if (lockObj == null)
                {
                    lockObj = new Object();
                    sLockObjMap.put(topicName, lockObj);
                }
            }
        }
    
        public boolean setPolling()
        {
            synchronized (lockObj)
            {
                Boolean ret = sPollingMap.get(topicName);
                if (ret == null || !ret)
                {
                    sPollingMap.put(topicName, true);
                    return true;
                }
                return false;
            }
        }
    
        //唤醒全部线程
        public void clearPolling()
        {
            synchronized (lockObj)
            {
                sPollingMap.put(topicName, false);
                lockObj.notifyAll();
                System.out.println("Everyone WakeUp and Work!");
                logger.info("Everyone WakeUp and Work!");
            }
        }
    
        public ConsumerRecords<String, String> receiveMessage()
        {
            boolean polling = false;
            while (true)
            {
                //检查线程的poll状态,必要时休眠
                synchronized (lockObj)
                {
                    Boolean p = sPollingMap.get(topicName);
                    if (p != null && p)
                    {
                        try
                        {
                            System.out.println("Thread" + workerId + " Have a nice sleep!");
                            logger.info("Thread" + workerId +" Have a nice sleep!");
                            polling = false;
                            lockObj.wait();
                        }
                        catch (InterruptedException e)
                        {
                            System.out.println("MessageReceiver Interrupted! topicName is " + topicName);
                            logger.error("MessageReceiver Interrupted! topicName is "+topicName);
    
                            return null;
                        }
                    }
                }
    
                //开始消费,必要时唤醒其他线程消费
                try
                {
                    ConsumerRecords<String, String> Records = null;
                    if (!polling)
                    {
                        Records = kafkaConsumer.poll(100);                    
                        if (Records.count() == 0)
                        {
                            polling = true;
                            continue;
                        }
                    }
                    else
                    {
                        if (setPolling())
                        {
                            System.out.println("Thread" + workerId + " Polling!");
                            logger.info("Thread " + workerId + " Polling!");
                        }
                        else
                        {
                            continue;
                        }
                        do
                        {
                            System.out.println("Thread" + workerId + " KEEP Poll records!");
                            logger.info("Thread" + workerId + " KEEP Poll records!");
                            try
                            {
                                Records = kafkaConsumer.poll(WAIT_SECONDS);
                            }
                            catch (Exception e)
                            {
                                System.out.println("Exception Happened when polling records: " + e);
                                logger.error("Exception Happened when polling records: " + e);
    
                            }
                        } while (Records.count()==0);
                        clearPolling();
                    }
                    //消息确认
                    kafkaConsumer.commitSync();
                    return Records;
                }
                catch (Exception e)
                {
                    System.out.println("Exception Happened when poll records: " + e);
                    logger.error("Exception Happened when poll records: " + e);
                }
            }
        }
    }

代码示例运行结果

[2018-01-25 22:40:51,841] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:51,841] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:52,122] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,216] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,325] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:52,325] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:54,947] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:40:54,979] INFO Thread3 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:32,347] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:42,353] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,816] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:47,847] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:47,925] INFO Thread 3 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:47,925] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:47,925] INFO Thread3 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,957] INFO Thread2 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:48,472] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,503] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,518] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,550] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,597] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,659] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,659] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:48,675] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,675] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,706] INFO Thread 1 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,706] INFO Thread1 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)