更新时间:2024-12-10 GMT+08:00

Kafka 多线程Consumer API 使用样例

功能介绍

Kafka Consumer API使用样例基础上,实现了多线程并发消费,可根据Topic的Partition数目起相应个数的Consumer线程来对应消费消息。

下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类中,用于实现对指定Topic的并发消费。

Kafka不支持无缝集成SpringBoot项目。

代码样例

单个消费者线程的doWork()方法逻辑(run方法重写)。

        /**
         * 订阅Topic的消息处理函数
         */
        public void doWork() {
            // 订阅
            consumer.subscribe(Collections.singletonList(this.topic));
            // 消息消费请求
            ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
            // 消息处理
            for (ConsumerRecord<Integer, String> record : records) {
                LOG.info(receivedThreadId+"Received message: (" + record.key() + ", " + record.value()
                        + ") at offset " + record.offset());
            }
        }

ConsumerMultThread主类的线程启动逻辑。

    public void run()
    {
        LOG.info("Consumer: start.");

        for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++)
        {
            Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,threadNum);
            consumerThread.start();
        }
    }