Old Consumer API Usage Sample
Function Description
Each Consumer instance belongs to a Consumer group, and one message is consumed by one Consumer instance in a same Consumer group. Multiple Consumer groups can consume a same message at the same time.
The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.Old_Consumer class. It is used to subscribe to messages of a specific topic. (Note: The old Consumer APIs support only access to topics without ACL restrictions. For details, see Security APIs.)
Sample Code
Consumption logic in the run method of the old Consumer API threads
/** *Run Consumer to subscribe to specified topic messages on Kafka. */ public void run() { LOG.info("Consumer: start."); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); LOG.info("Consumerstreams size is : " + streams.size()); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); } } LOG.info("Consumer End."); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.