使用Consumer API订阅安全Topic并消费
功能介绍
用于实现使用Consumer API订阅安全Topic,并进行消息消费。
代码样例
以下为用于实现使用Consumer API订阅安全Topic,并进行消息消费的代码片段。
详细内容在com.huawei.bigdata.kafka.example.Consumer类中。
/**
* Consumer构造函数。
* @param topic 订阅的Topic名称。
*/
public Consumer(String topic) {
super("KafkaConsumerExample", false);
// 初始化consumer启动所需的配置参数,详见代码。
Properties props = initProperties();
consumer = new KafkaConsumer<Integer, String>(props);
this.topic = topic;
}
public void doWork() {
// 订阅
consumer.subscribe(Collections.singletonList(this.topic));
// 消息消费请求
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// 消息处理
for (ConsumerRecord<Integer, String> record : records) {
LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}