Kafka Old Consumer API使用样例
功能介绍
每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。
下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
样例代码
Old Consumer API线程run方法中的消费逻辑
/** *启动执行Consumer,订阅Kafka上指定topic消息。 */ public void run() { LOG.info("Consumer: start."); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); LOG.info("Consumerstreams size is : " + streams.size()); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); } } LOG.info("Consumer End."); }