Multi-Thread Consumer API Usage Sample
Function Description
The multi-thread consumer function is implemented based on the sample codes described in Consumer API Usage Sample. The number of consumer threads that can be started to consume the messages in partitions is the same as the number of partitions in the topic.
The following code snippets belong to the com.huawei.bigdata.kafka.example.ConsumerMultThread class. They are used to implement concurrent consumption of messages in a specified topic.
Kafka does not support seamless integration of the SpringBoot project.
Sample Code
DoWork() method logic of a single consumer thread (rewrite of the run method)
/**
* Message processing function for subscribing to topics
*/
public void doWork() {
// Subscribe.
consumer.subscribe(Collections.singletonList(this.topic));
// Message consumption request
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// Message processing
for (ConsumerRecord<Integer, String> record : records) {
LOG.info(receivedThreadId+"Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
} Thread startup logic of the ConsumerMultThread main class
public void run()
{
LOG.info("Consumer: start.");
for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++)
{
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,threadNum);
consumerThread.start();
}
} Last Article: Multi-Thread Producer API Usage Sample
Next Article: SimpleConsumer API Usage Sample
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.