文档首页 > > 开发指南(适用于2.x及之前)> Kafka应用开发> 开发程序> Old Consumer API使用样例

Old Consumer API使用样例

分享
更新时间:2020/01/11 GMT+08:00

功能介绍

每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。

下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见安全接口说明

样例代码

Old Consumer API线程run方法中的消费逻辑

/**  *启动执行Consumer,订阅Kafka上指定topic消息。  */  
 public void run() 
 { 
   LOG.info("Consumer: start."); 

   Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
   topicCountMap.put(topic, new Integer(1)); 
   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

   LOG.info("Consumerstreams size is : " + streams.size()); 

   for (KafkaStream<byte[], byte[]> stream : streams) 
   { 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

     while (it.hasNext()) 
     { 
       LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); 
     } 
   } 

   LOG.info("Consumer End."); 
 }

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问