文档首页 > > 开发指南> Kafka应用开发> 开发程序> 多线程Consumer API 使用样例

多线程Consumer API 使用样例

分享
更新时间: 2020/01/11 GMT+08:00

功能介绍

Consumer API使用样例基础上,实现了多线程并发消费,可根据Topic的Partition数目起相应个数的Consumer线程来对应消费消息。

下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类中,用于实现对指定Topic的并发消费。

Kafka不支持无缝集成SpringBoot项目。

代码样例

单个消费者线程的doWork()方法逻辑(run方法重写)

        /**
         * 订阅Topic的消息处理函数
         */
        public void doWork() {
            // 订阅
            consumer.subscribe(Collections.singletonList(this.topic));
            // 消息消费请求
            ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
            // 消息处理
            for (ConsumerRecord<Integer, String> record : records) {
                LOG.info(receivedThreadId+"Received message: (" + record.key() + ", " + record.value()
                        + ") at offset " + record.offset());
            }
        }

ConsumerMultThread主类的线程启动逻辑

    public void run()
    {
        LOG.info("Consumer: start.");

        for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++)
        {
            Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,threadNum);
            consumerThread.start();
        }
    }
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问