Kafka 多线程Consumer API 使用样例
功能介绍
在Kafka Consumer API使用样例基础上,实现了多线程并发消费,可根据Topic的Partition数目起相应个数的Consumer线程来对应消费消息。
下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类中,用于实现对指定Topic的并发消费。
Kafka不支持无缝集成SpringBoot项目。
代码样例
单个消费者线程的doWork()方法逻辑(run方法重写)。
/** * 订阅Topic的消息处理函数 */ public void doWork() { // 订阅 consumer.subscribe(Collections.singletonList(this.topic)); // 消息消费请求 ConsumerRecords<Integer, String> records = consumer.poll(waitTime); // 消息处理 for (ConsumerRecord<Integer, String> record : records) { LOG.info(receivedThreadId+"Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } }
ConsumerMultThread主类的线程启动逻辑。
public void run() { LOG.info("Consumer: start."); for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) { Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,threadNum); consumerThread.start(); } }