更新时间:2024-12-10 GMT+08:00

Kafka Consumer API使用样例

功能介绍

下面代码片段在com.huawei.bigdata.kafka.example.Consumer类中,用于消费订阅的Topic消息。

代码样例

Consumer线程的dowork方法逻辑,该方法是run方法的重写。

样例代码获取方式请参考获取MRS应用开发样例工程

代码样例:

    /**
     * 订阅Topic的消息处理函数
     */
    public void doWork()
    {
        // 订阅
        consumer.subscribe(Collections.singletonList(this.topic));
        // 消息消费请求
        ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
        // 消息处理
        for (ConsumerRecord<Integer, String> record : records)
        {
            LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value()
                + ") at offset " + record.offset());
        }
    }