更新时间:2024-12-10 GMT+08:00
Kafka SimpleConsumer API使用样例
功能介绍
下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。
代码样例
SimpleConsumer API主方法需要传入三个参数,最大消费数量、消费Topic、消费的Topic分区。
public static void main(String args[])
{
// 允许读取的最大消息数
long maxReads = 0;
try
{
maxReads = Long.valueOf(args[0]);
}
catch (Exception e)
{
log.error("args[0] should be a number for maxReads.\n" +
"args[1] should be a string for topic. \n" +
"args[2] should be a number for partition.");
return;
}
if (null == args[1])
{
log.error("args[0] should be a number for maxReads.\n" +
"args[1] should be a string for topic. \n" +
"args[2] should be a number for partition.");
return;
}
// 消费的消息主题
// String topic = KafkaProperties.TOPIC;
String topic = args[1];
// 消息的消息分区
int partition = 0;
try
{
partition = Integer.parseInt(args[2]);
}
catch (Exception e)
{
log.error("args[0] should be a number for maxReads.\n" +
"args[1] should be a string for topic. \n" +
"args[2] should be a number for partition.");
}
// Broker List
String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092");
Map<String, Integer> ipPort = getIpPortMap(bkList);
SimpleConsumerDemo example = new SimpleConsumerDemo();
try
{
example.run(maxReads, topic, partition, ipPort);
}
catch (Exception e)
{
log.info("Oops:" + e);
e.printStackTrace();
}
}
父主题: 开发Kafka应用