更新时间:2024-10-31 GMT+08:00

使用Consumer API订阅安全Topic并消费

功能介绍

用于实现使用Consumer API订阅安全Topic,并进行消息消费。

代码样例

以下为用于实现使用Consumer API订阅安全Topic,并进行消息消费的代码片段。

详细内容在com.huawei.bigdata.kafka.example.Consumer类中。

/**     
  * Consumer构造函数。 
  * @param topic 订阅的Topic名称。     
  */

public Consumer(String topic) {
    super("KafkaConsumerExample", false);
    // 初始化consumer启动所需的配置参数,详见代码。
    Properties props = initProperties();
    consumer = new KafkaConsumer<Integer, String>(props);
    this.topic = topic;
}

public void doWork() {
    // 订阅
    consumer.subscribe(Collections.singletonList(this.topic));
    // 消息消费请求
    ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
    // 消息处理
    for (ConsumerRecord<Integer, String> record : records) {
        LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}