文档首页 > > 开发指南> Kafka应用开发> 开发程序> SimpleConsumer API使用样例

SimpleConsumer API使用样例

分享
更新时间: 2020/01/11 GMT+08:00

功能介绍

下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见安全接口说明

SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。

代码样例

SimpleConsumer API主方法需要传入三个参数,最大消费数量、消费Topic、消费的Topic分区

public static void main(String args[])
    {
        // 允许读取的最大消息数
        long maxReads = 0;
        
        try
        {
            maxReads = Long.valueOf(args[0]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        if (null == args[1])
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        // 消费的消息主题
        // String topic = KafkaProperties.TOPIC;
        String topic = args[1];
        
        // 消息的消息分区
        int partition = 0;
        try 
        {
            partition = Integer.parseInt(args[2]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
        }
        
        // Broker List
        String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092");
        
        Map<String, Integer> ipPort = getIpPortMap(bkList);
        
        SimpleConsumerDemo example = new SimpleConsumerDemo();
        try
        {
            example.run(maxReads, topic, partition, ipPort);
        }
        catch (Exception e)
        {
            log.info("Oops:" + e);
            e.printStackTrace();
        }
    }

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问