Updated on 2022-06-01 GMT+08:00

Old Consumer API Usage Sample

Function Description

Each Consumer instance belongs to a Consumer group, and one message is consumed by one Consumer instance in a same Consumer group. Multiple Consumer groups can consume a same message at the same time.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.Old_Consumer class. It is used to subscribe to messages of a specific topic. (Note: The old Consumer APIs support only access to topics without ACL restrictions. For details, see Security APIs.)

Sample Code

Consumption logic in the run method of the old Consumer API threads

/**  *Run Consumer to subscribe to specified topic messages on Kafka.  */  
 public void run() 
 { 
   LOG.info("Consumer: start."); 

   Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
   topicCountMap.put(topic, new Integer(1)); 
   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

   LOG.info("Consumerstreams size is : " + streams.size()); 

   for (KafkaStream<byte[], byte[]> stream : streams) 
   { 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

     while (it.hasNext()) 
     { 
       LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); 
     } 
   } 

   LOG.info("Consumer End."); 
 }