Multi-thread Consumer Sample
Function Description
The multi-thread consumer function is implemented based on the code sample described in section Consumer API Usage Sample. The number of consumer threads that can be started to consume the messages in partitions is the same as the number of partitions in the topic.
The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.ConsumerMultThread class, and these code snippets are used to implement concurrent consumption of messages in a specified topic.
Code Sample
/** * Start the concurrent multi-thread consumer. */ public void run() { LOG.info("Consumer: start."); Properties props = Consumer.initProperties(); // Start a specified number of consumer threads to consume. // Note: When this parameter is larger than the number of partitions of the topic to be consumed, the extra threads fails to consume data. for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) { new ConsumerThread(threadNum, topic, props).start(); LOG.info("Consumer Thread " + threadNum + " Start."); } } private class ConsumerThread extends ShutdownableThread { private int threadNum = 0; private String topic; private Properties props; private KafkaConsumer<String, String> consumer = null; /** * Construction method of the consumer thread class * * @param threadNum Thread number * @param topic topic */ public ConsumerThread(int threadNum, String topic, Properties props) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.props = props; this.consumer = new KafkaConsumer<String, String>(props); } public void doWork() { consumer.subscribe(Collections.singleton(this.topic)); ConsumerRecords<String, String> records = consumer.poll(waitTime); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot