使用多线程Consumer消费消息
功能介绍
在使用Consumer API订阅安全Topic并消费基础上,实现了多线程并发消费,可根据Topic的Partition数目启动相应个数的Consumer线程来对应消费每个Partition上的消息。
下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类的run方法中,用于实现对指定Topic的并发消费。
代码样例
/**
* 启动多线程并发消费Consumer。
*/
public void run() {
LOG.info("Consumer: start.");
Properties props = Consumer.initProperties();
// 启动指定个数Consuemr线程来消费
// 注意:当该参数大于待消费Topic的Partition个数时,多出的线程将无法消费到数据
for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) {
new ConsumerThread(threadNum, topic, props).start();
LOG.info("Consumer Thread " + threadNum + " Start.");
}
}
private class ConsumerThread extends ShutdownableThread {
private int threadNum = 0;
private String topic;
private Properties props;
private KafkaConsumer<String, String> consumer = null;
/**
* 消费者线程类构造方法
*
* @param threadNum 线程号
* @param topic topic
*/
public ConsumerThread(int threadNum, String topic, Properties props) {
super("ConsumerThread" + threadNum, true);
this.threadNum = threadNum;
this.topic = topic;
this.props = props;
this.consumer = new KafkaConsumer<String, String>(props);
}
public void doWork() {
consumer.subscribe(Collections.singleton(this.topic));
ConsumerRecords<String, String> records = consumer.poll(waitTime);
for (ConsumerRecord<String, String> record : records) {
LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: "
+ record.value() + " offsets: " + record.offset());
}
}
}