更新时间:2024-08-03 GMT+08:00
使用多线程Consumer消费消息
功能介绍
在使用Consumer API订阅安全Topic并消费基础上,实现了多线程并发消费,可根据Topic的Partition数目启动相应个数的Consumer线程来对应消费每个Partition上的消息。
下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类的run方法中,用于实现对指定Topic的并发消费。
代码样例
/** * 启动多线程并发消费Consumer。 */ public void run() { LOG.info("Consumer: start."); Properties props = Consumer.initProperties(); // 启动指定个数Consuemr线程来消费 // 注意:当该参数大于待消费Topic的Partition个数时,多出的线程将无法消费到数据 for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) { new ConsumerThread(threadNum, topic, props).start(); LOG.info("Consumer Thread " + threadNum + " Start."); } } private class ConsumerThread extends ShutdownableThread { private int threadNum = 0; private String topic; private Properties props; private KafkaConsumer<String, String> consumer = null; /** * 消费者线程类构造方法 * * @param threadNum 线程号 * @param topic topic */ public ConsumerThread(int threadNum, String topic, Properties props) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.props = props; this.consumer = new KafkaConsumer<String, String>(props); } public void doWork() { consumer.subscribe(Collections.singleton(this.topic)); ConsumerRecords<String, String> records = consumer.poll(waitTime); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); } } }
父主题: 开发Kafka应用