Kafka SimpleConsumer API使用样例
功能介绍
下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。
代码样例
SimpleConsumer API主方法需要传入三个参数,最大消费数量、消费Topic、消费的Topic分区
public static void main(String args[]) { // 允许读取的最大消息数 long maxReads = 0; try { maxReads = Long.valueOf(args[0]); } catch (Exception e) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); return; } if (null == args[1]) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); return; } // 消费的消息主题 // String topic = KafkaProperties.TOPIC; String topic = args[1]; // 消息的消息分区 int partition = 0; try { partition = Integer.parseInt(args[2]); } catch (Exception e) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); } // Broker List String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092"); Map<String, Integer> ipPort = getIpPortMap(bkList); SimpleConsumerDemo example = new SimpleConsumerDemo(); try { example.run(maxReads, topic, partition, ipPort); } catch (Exception e) { log.info("Oops:" + e); e.printStackTrace(); } }