Kafka SimpleConsumer API Usage Sample
Function Description
The following code snippet belongs to the com.huawei.bigdata.kafka.example.SimpleConsumerDemo class. It is used to enable the new SimpleConsumer APIs to subscribe a topic and consume messages. (Note: The SimpleConsumer APIs support only access to topics without ACL restrictions. For details, see Kafka Security APIs.)
SimpleConsumer APIs belong to low-level Consumer APIs, which are not recommended for accessing ZooKeeper metadata and managing the offset of the consumption topic queue.
Sample Code
The main method of SimpleConsumer APIs requires three parameters: Maximum consumption amount, consumption topic, and consumption topic partition.
public static void main(String args[]) { // Maximum number of messages that can be read long maxReads = 0; try { maxReads = Long.valueOf(args[0]); } catch (Exception e) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); return; } if (null == args[1]) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); return; } // Topic of messages that are consumed // String topic = KafkaProperties.TOPIC; String topic = args[1]; // Partition of messages that are consumed int partition = 0; try { partition = Integer.parseInt(args[2]); } catch (Exception e) { log.error("args[0] should be a number for maxReads.\n" + "args[1] should be a string for topic. \n" + "args[2] should be a number for partition."); } // Broker List String bkList = KafkaProperties.getInstance().getValues("metadata.broker.list", "localhost:9092"); Map<String, Integer> ipPort = getIpPortMap(bkList); SimpleConsumerDemo example = new SimpleConsumerDemo(); try { example.run(maxReads, topic, partition, ipPort); } catch (Exception e) { log.info("Oops:" + e); e.printStackTrace(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot