Updated on 2024-08-16 GMT+08:00

Kafka SimpleConsumer API Usage Sample

Function Description

The following code snippet belongs to the com.huawei.bigdata.kafka.example.SimpleConsumerDemo class. It is used to enable the new SimpleConsumer APIs to subscribe a topic and consume messages. (Note: The SimpleConsumer APIs support only access to topics without ACL restrictions. For details, see Kafka Security APIs.)

SimpleConsumer APIs belong to low-level Consumer APIs, which are not recommended for accessing ZooKeeper metadata and managing the offset of the consumption topic queue.

Sample Code

The main method of SimpleConsumer APIs requires three parameters: Maximum consumption amount, consumption topic, and consumption topic partition.

public static void main(String args[])
    {
        // Maximum number of messages that can be read
        long maxReads = 0;
        
        try
        {
            maxReads = Long.valueOf(args[0]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        if (null == args[1])
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
            return;
        }
        
        // Topic of messages that are consumed
        // String topic = KafkaProperties.TOPIC;
        String topic = args[1];
        
        // Partition of messages that are consumed
        int partition = 0;
        try 
        {
            partition = Integer.parseInt(args[2]);
        }
        catch (Exception e)
        {
            log.error("args[0] should be a number for maxReads.\n" + 
                "args[1] should be a string for topic. \n" + 
                "args[2] should be a number for partition.");
        }
        
        // Broker List
        String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092");
        
        Map<String, Integer> ipPort = getIpPortMap(bkList);
        
        SimpleConsumerDemo example = new SimpleConsumerDemo();
        try
        {
            example.run(maxReads, topic, partition, ipPort);
        }
        catch (Exception e)
        {
            log.info("Oops:" + e);
            e.printStackTrace();
        }
    }