更新时间:2024-08-03 GMT+08:00

使用多线程Consumer消费消息

功能简介

使用Consumer API订阅安全Topic并消费基础上,实现了多线程并发消费,可根据Topic的Partition数目启动相应个数的Consumer线程来对应消费每个Partition上的消息。

下面代码片段在com.huawei.bigdata.kafka.example.ConsumerMultThread类的run方法中,用于实现对指定Topic的并发消费。

代码样例

/**
 * 启动多线程并发消费Consumer。
  */
public void run() {
    LOG.info("Consumer: start.");
    Properties props = Consumer.initProperties();
    // 启动指定个数Consuemr线程来消费
    // 注意:当该参数大于待消费Topic的Partition个数时,多出的线程将无法消费到数据
    for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) {
        new ConsumerThread(threadNum, topic, props).start();
        LOG.info("Consumer Thread " + threadNum + " Start.");
    }
}

private class ConsumerThread extends ShutdownableThread {
    private int threadNum = 0;
    private String topic;
    private Properties props;
    private KafkaConsumer<String, String> consumer = null;

    /**
     * 消费者线程类构造方法
     *
     * @param threadNum 线程号
     * @param topic     topic
     */
    public ConsumerThread(int threadNum, String topic, Properties props) {
        super("ConsumerThread" + threadNum, true);
        this.threadNum = threadNum;
        this.topic = topic;
        this.props = props;
        this.consumer = new KafkaConsumer<String, String>(props);
    }

    public void doWork() {
        consumer.subscribe(Collections.singleton(this.topic));
        ConsumerRecords<String, String> records = consumer.poll(waitTime);
        for (ConsumerRecord<String, String> record : records) {
            LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: "
                + record.value() + " offsets: " + record.offset());
        }
    }
}