Multi-thread Consumer Sample
Function
The multi-thread consumer function is implemented based on the code sample described in Consumer API Sample. The number of Consumer threads that can be started to consume the messages in partitions is the same as the number of partitions in the topic.
The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.ConsumerMultThread class. They are used to implement concurrent consumption of messages in a specified topic.
Sample Code
/** * Start the multi-thread consumer function. */ public void run() { LOG.info("Consumer: start."); Properties props = Consumer.initProperties(); // Start a specified number of consumer threads to consume data. // Note: If the value of this parameter is greater than the number of partitions of the topic to be consumed, the excess threads cannot consume data. for (int threadNum = 0; threadNum < CONCURRENCY_THREAD_NUM; threadNum++) { new ConsumerThread(threadNum, topic, props).start(); LOG.info("Consumer Thread " + threadNum + " Start."); } } private class ConsumerThread extends ShutdownableThread { private int threadNum = 0; private String topic; private Properties props; private KafkaConsumer<String, String> consumer = null; /** * Constructor of the consumer thread class * * @param threadNum Thread ID * @param topic topic */ public ConsumerThread(int threadNum, String topic, Properties props) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.props = props; this.consumer = new KafkaConsumer<String, String>(props); } public void doWork() { consumer.subscribe(Collections.singleton(this.topic)); ConsumerRecords<String, String> records = consumer.poll(waitTime); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot