更新时间:2024-06-14 GMT+08:00
分享

Kafka SimpleConsumer API使用样例

功能介绍

下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍

SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。

代码样例

SimpleConsumer API主方法需要传入三个参数,最大消费数量、消费Topic、消费的Topic分区

public static void main(String args[])
    {
        // 允许读取的最大消息数
        long maxReads = 0;
        
        try
        {
            maxReads = Long.valueOf(args[0]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        if (null == args[1])
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        // 消费的消息主题
        // String topic = KafkaProperties.TOPIC;
        String topic = args[1];
        
        // 消息的消息分区
        int partition = 0;
        try 
        {
            partition = Integer.parseInt(args[2]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
        }
        
        // Broker List
        String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092");
        
        Map<String, Integer> ipPort = getIpPortMap(bkList);
        
        SimpleConsumerDemo example = new SimpleConsumerDemo();
        try
        {
            example.run(maxReads, topic, partition, ipPort);
        }
        catch (Exception e)
        {
            log.info("Oops:" + e);
            e.printStackTrace();
        }
    }

相关文档