Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Optimizing Consumer Polling

Updated on 2024-10-16 GMT+08:00

Overview

Scenario

In the native Kafka SDK provided by DMS for Kafka, consumers can customize the duration for pulling messages. To pull messages for a long time, consumers only need to set the parameter of the poll(long) method to a proper value. However, such persistent connections may cause pressure on the client and the server, especially when the number of partitions is large and multiple threads are enabled for each consumer.

As shown in Figure 1, the topic contains multiple partitions, and multiple consumers in the consumer group consume the resources at the same time. Each thread is in a persistent connection. When there are few or no messages in the topic, the connection persists, and all consumers pull messages continuously, which causes a waste of resources.

Figure 1 Multi-thread consumption of Kafka consumers

Solution

When multiple threads are enabled for concurrent access, if there is no message in the topic, only one thread is required to poll for messages in each partition. When a message is found by the polling thread, other threads can be woken up to consume the message for quick responses, as shown in Figure 2.

This solution is applicable to scenarios with low requirements on real-time message consumption. If quasi-real-time message consumption is required, it is recommended that all consumers be in the active state.

Figure 2 Optimized multi-thread consumption solution
NOTE:

The number of consumers and the number of partitions are not necessarily the same. The poll (long) method of Kafka helps implement the functions such as message acquisition, partition balancing, and heartbeat detection between consumers and Kafka brokers.

Therefore, in scenarios where the requirements on real-time message consumption are low and there is a small number of messages, some consumers can be in the wait state.

Sample Code

NOTICE:

The following describes only the code related to wake-up and sleep of the consumer thread. To run the entire demo, download the complete sample code package and refer to the Developer Guide for deploying and running the code.

  • Sample code for consuming messages:
    package com.huawei.dms.kafka;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.log4j.Logger;
    
    public class DmsKafkaConsumeDemo
    {
        private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
    
        public static void WorkerFunc(int workerId, KafkaConsumer<String, String> kafkaConsumer) throws IOException
        {
            Properties consumerConfig = Config.getConsumerConfig();
            RecordReceiver receiver = new RecordReceiver(workerId, kafkaConsumer, consumerConfig.getProperty("topic"));
            while (true)
            {
                ConsumerRecords<String, String> records = receiver.receiveMessage();
                Iterator<ConsumerRecord<String, String>> iter = records.iterator();
                while (iter.hasNext())
                {
                    ConsumerRecord<String, String> cr = iter.next();
                    System.out.println("Thread" + workerId + " recievedrecords" + cr.value());
                    logger.info("Thread" + workerId + " recievedrecords" + cr.value());
    
                }
    
            }
        }
    
        public static KafkaConsumer<String, String> getConsumer() throws IOException
        {
            Properties consumerConfig = Config.getConsumerConfig();
    
            consumerConfig.put("ssl.truststore.location", Config.getTrustStorePath());
            System.setProperty("java.security.auth.login.config", Config.getSaslConfig());
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
            kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getProperty("topic")),
                    new ConsumerRebalanceListener()
                    {
                        @Override
                        public void onPartitionsRevoked(Collection<TopicPartition> arg0)
                        {
    
                        }
    
                        @Override
                        public void onPartitionsAssigned(Collection<TopicPartition> tps)
                        {
    
                        }
                    });
            return kafkaConsumer;
        }
    
        public static void main(String[] args) throws IOException
        {
    
            // Create a consumer for the current consumer group.
            final KafkaConsumer<String, String> consumer1 = getConsumer();
            Thread thread1 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(1, consumer1);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            final KafkaConsumer<String, String> consumer2 = getConsumer();
    
            Thread thread2 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(2, consumer2);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            final KafkaConsumer<String, String> consumer3 = getConsumer();
    
            Thread thread3 = new Thread(new Runnable()
            {
                public void run()
                {
                    try
                    {
                        WorkerFunc(3, consumer3);
                    }
                    catch (IOException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
    
            // Start threads.
            thread1.start();
            thread2.start();
            thread3.start();
    
            try
            {
                Thread.sleep(5000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
           //Add threads.
            try
            {
                thread1.join();
                thread2.join();
                thread3.join();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
  • Sample code for managing consumer threads:

    The sample code provides only simple design ideas. Developers can optimize the thread wake-up and sleep mechanisms based on actual scenarios.

    NOTE:

    topicName is the name of the topic.

    package com.huawei.dms.kafka;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import org.apache.log4j.Logger;
    
    public class RecordReceiver
    {
        private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
        
        // Interval time of polling
        public static final int WAIT_SECONDS = 10 * 1000;
    
        protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
    
        protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
    
        protected Object lockObj;
    
        protected String topicName;
    
        protected KafkaConsumer<String, String> kafkaConsumer;
    
        protected int workerId;
    
        public RecordReceiver(int id, KafkaConsumer<String, String> kafkaConsumer, String queue)
        {
            this.kafkaConsumer = kafkaConsumer;
            this.topicName = queue;
            this.workerId = id;
    
            synchronized (sLockObjMap)
            {
                lockObj = sLockObjMap.get(topicName);
                if (lockObj == null)
                {
                    lockObj = new Object();
                    sLockObjMap.put(topicName, lockObj);
                }
            }
        }
    
        public boolean setPolling()
        {
            synchronized (lockObj)
            {
                Boolean ret = sPollingMap.get(topicName);
                if (ret == null || !ret)
                {
                    sPollingMap.put(topicName, true);
                    return true;
                }
                return false;
            }
        }
    
        // Wake up all threads.
        public void clearPolling()
        {
            synchronized (lockObj)
            {
                sPollingMap.put(topicName, false);
                lockObj.notifyAll();
                System.out.println("Everyone WakeUp and Work!");
                logger.info("Everyone WakeUp and Work!");
            }
        }
    
        public ConsumerRecords<String, String> receiveMessage()
        {
            boolean polling = false;
            while (true)
            {
                // Check the poll status of threads and hibernate the threads when necessary.
                synchronized (lockObj)
                {
                    Boolean p = sPollingMap.get(topicName);
                    if (p != null && p)
                    {
                        try
                        {
                            System.out.println("Thread" + workerId + " Have a nice sleep!");
                            logger.info("Thread" + workerId +" Have a nice sleep!");
                            polling = false;
                            lockObj.wait();
                        }
                        catch (InterruptedException e)
                        {
                            System.out.println("MessageReceiver Interrupted! topicName is " + topicName);
                            logger.error("MessageReceiver Interrupted! topicName is "+topicName);
    
                            return null;
                        }
                    }
                }
    
                // Start to consume and wake up other threads when necessary.
                try
                {
                    ConsumerRecords<String, String> Records = null;
                    if (!polling)
                    {
                        Records = kafkaConsumer.poll(100);                    
                        if (Records.count() == 0)
                        {
                            polling = true;
                            continue;
                        }
                    }
                    else
                    {
                        if (setPolling())
                        {
                            System.out.println("Thread" + workerId + " Polling!");
                            logger.info("Thread " + workerId + " Polling!");
                        }
                        else
                        {
                            continue;
                        }
                        do
                        {
                            System.out.println("Thread" + workerId + " KEEP Poll records!");
                            logger.info("Thread" + workerId + " KEEP Poll records!");
                            try
                            {
                                Records = kafkaConsumer.poll(WAIT_SECONDS);
                            }
                            catch (Exception e)
                            {
                                System.out.println("Exception Happened when polling records: " + e);
                                logger.error("Exception Happened when polling records: " + e);
    
                            }
                        } while (Records.count()==0);
                        clearPolling();
                    }
                    // Acknowledge message consumption.
                    kafkaConsumer.commitSync();
                    return Records;
                }
                catch (Exception e)
                {
                    System.out.println("Exception Happened when poll records: " + e);
                    logger.error("Exception Happened when poll records: " + e);
                }
            }
        }
    }

Running Result

[2018-01-25 22:40:51,841] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:51,841] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:52,122] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,216] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:40:52,325] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:40:52,325] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:40:54,947] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:40:54,979] INFO Thread3 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:32,347] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:42,353] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,816] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:47,847] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:47,925] INFO Thread 3 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:47,925] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:47,925] INFO Thread3 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:47,957] INFO Thread2 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87)
[2018-01-25 22:41:48,472] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,503] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,518] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,550] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,597] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,659] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,659] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
[2018-01-25 22:41:48,675] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32)
[2018-01-25 22:41:48,675] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69)
[2018-01-25 22:41:48,706] INFO Thread 1 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119)
[2018-01-25 22:41:48,706] INFO Thread1 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback