Kafka 多线程Consumer API 使用样例
功能介绍
在Kafka Consumer API使用样例基础上,实现了多线程并发消费,可根据Topic的Partition数目起相应个数的Consumer线程来对应消费消息。
下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类中,用于实现对指定Topic的并发消费。
Kafka不支持无缝集成SpringBoot项目。
代码样例
单个消费者线程的doWork()方法逻辑(run方法重写)。
/**
* 订阅Topic的消息处理函数
*/
public void doWork() {
// 订阅
consumer.subscribe(Collections.singletonList(this.topic));
// 消息消费请求
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// 消息处理
for (ConsumerRecord<Integer, String> record : records) {
LOG.info(receivedThreadId+"Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
}
ConsumerMultThread主类的线程启动逻辑。
public void run()
{
LOG.info("Consumer: start.");
for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++)
{
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,threadNum);
consumerThread.start();
}
}