Kafka Consumer API使用样例
功能介绍
下面代码片段在com.huawei.bigdata.kafka.example.Consumer类中,用于消费订阅的Topic消息。
代码样例
Consumer线程的dowork方法逻辑,该方法是run方法的重写。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/**
* 订阅Topic的消息处理函数
*/
public void doWork()
{
// 订阅
consumer.subscribe(Collections.singletonList(this.topic));
// 消息消费请求
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// 消息处理
for (ConsumerRecord<Integer, String> record : records)
{
LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
}