优化消费者轮询(Polling)
方案概述
应用场景
在分布式消息服务Kafka版提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。
如图1所示,Topic含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当Topic中消息较少或者没有消息时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。
解决方案
在开了多个线程同时访问的情况下,如果Topic里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现Topic中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图2所示。
这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。
消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。
因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。
代码示例
- 消费消息代码示例:
package com.huawei.dms.kafka; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.log4j.Logger; public class DmsKafkaConsumeDemo { private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class); public static void WorkerFunc(int workerId, KafkaConsumer<String, String> kafkaConsumer) throws IOException { Properties consumerConfig = Config.getConsumerConfig(); RecordReceiver receiver = new RecordReceiver(workerId, kafkaConsumer, consumerConfig.getProperty("topic")); while (true) { ConsumerRecords<String, String> records = receiver.receiveMessage(); Iterator<ConsumerRecord<String, String>> iter = records.iterator(); while (iter.hasNext()) { ConsumerRecord<String, String> cr = iter.next(); System.out.println("Thread" + workerId + " recievedrecords" + cr.value()); logger.info("Thread" + workerId + " recievedrecords" + cr.value()); } } } public static KafkaConsumer<String, String> getConsumer() throws IOException { Properties consumerConfig = Config.getConsumerConfig(); consumerConfig.put("ssl.truststore.location", Config.getTrustStorePath()); System.setProperty("java.security.auth.login.config", Config.getSaslConfig()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig); kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getProperty("topic")), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> arg0) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> tps) { } }); return kafkaConsumer; } public static void main(String[] args) throws IOException { //创建当前消费组的consumer final KafkaConsumer<String, String> consumer1 = getConsumer(); Thread thread1 = new Thread(new Runnable() { public void run() { try { WorkerFunc(1, consumer1); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); final KafkaConsumer<String, String> consumer2 = getConsumer(); Thread thread2 = new Thread(new Runnable() { public void run() { try { WorkerFunc(2, consumer2); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); final KafkaConsumer<String, String> consumer3 = getConsumer(); Thread thread3 = new Thread(new Runnable() { public void run() { try { WorkerFunc(3, consumer3); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //启动线程 thread1.start(); thread2.start(); thread3.start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //线程加入 try { thread1.join(); thread2.join(); thread3.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 消费者线程管理代码示例:
示例仅提供简单的设计思路,开发者可结合实际场景优化线程休眠和唤醒机制。
topicName配置为Topic名称。
package com.huawei.dms.kafka; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger; public class RecordReceiver { private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class); //polling的间隔时间 public static final int WAIT_SECONDS = 10 * 1000; protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>(); protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>(); protected Object lockObj; protected String topicName; protected KafkaConsumer<String, String> kafkaConsumer; protected int workerId; public RecordReceiver(int id, KafkaConsumer<String, String> kafkaConsumer, String queue) { this.kafkaConsumer = kafkaConsumer; this.topicName = queue; this.workerId = id; synchronized (sLockObjMap) { lockObj = sLockObjMap.get(topicName); if (lockObj == null) { lockObj = new Object(); sLockObjMap.put(topicName, lockObj); } } } public boolean setPolling() { synchronized (lockObj) { Boolean ret = sPollingMap.get(topicName); if (ret == null || !ret) { sPollingMap.put(topicName, true); return true; } return false; } } //唤醒全部线程 public void clearPolling() { synchronized (lockObj) { sPollingMap.put(topicName, false); lockObj.notifyAll(); System.out.println("Everyone WakeUp and Work!"); logger.info("Everyone WakeUp and Work!"); } } public ConsumerRecords<String, String> receiveMessage() { boolean polling = false; while (true) { //检查线程的poll状态,必要时休眠 synchronized (lockObj) { Boolean p = sPollingMap.get(topicName); if (p != null && p) { try { System.out.println("Thread" + workerId + " Have a nice sleep!"); logger.info("Thread" + workerId +" Have a nice sleep!"); polling = false; lockObj.wait(); } catch (InterruptedException e) { System.out.println("MessageReceiver Interrupted! topicName is " + topicName); logger.error("MessageReceiver Interrupted! topicName is "+topicName); return null; } } } //开始消费,必要时唤醒其他线程消费 try { ConsumerRecords<String, String> Records = null; if (!polling) { Records = kafkaConsumer.poll(100); if (Records.count() == 0) { polling = true; continue; } } else { if (setPolling()) { System.out.println("Thread" + workerId + " Polling!"); logger.info("Thread " + workerId + " Polling!"); } else { continue; } do { System.out.println("Thread" + workerId + " KEEP Poll records!"); logger.info("Thread" + workerId + " KEEP Poll records!"); try { Records = kafkaConsumer.poll(WAIT_SECONDS); } catch (Exception e) { System.out.println("Exception Happened when polling records: " + e); logger.error("Exception Happened when polling records: " + e); } } while (Records.count()==0); clearPolling(); } //消息确认 kafkaConsumer.commitSync(); return Records; } catch (Exception e) { System.out.println("Exception Happened when poll records: " + e); logger.error("Exception Happened when poll records: " + e); } } } }
代码示例运行结果
[2018-01-25 22:40:51,841] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:40:51,841] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:40:52,122] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,216] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,325] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:40:52,325] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:40:54,947] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:40:54,979] INFO Thread3 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:32,347] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:42,353] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:47,816] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:47,847] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:47,925] INFO Thread 3 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:47,925] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:47,925] INFO Thread3 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:47,957] INFO Thread2 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:48,472] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:48,503] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,518] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,550] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,597] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,659] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:48,659] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:48,675] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,675] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:48,706] INFO Thread 1 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:48,706] INFO Thread1 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)