Updated on 2024-04-02 GMT+08:00

Consumer API Usage Sample

Function Description

The following code sample belongs to the com.huawei.bigdata.kafka.example.Consumer class. It is used to enable the Consumer API to subscribe a secure topic and consume messages.

Code Sample

/**     
  * Consumer constructor.   
  * @param topic Name of the subscribed topic 
  */

public Consumer(String topic) {
    super("KafkaConsumerExample", false);
    // Initializes the configuration parameters required for starting the consumer. For details, see the code. 
    Properties props = initProperties();
    consumer = new KafkaConsumer<Integer, String>(props);
    this.topic = topic;
}

public void doWork() {
    // Subscribe 
    consumer.subscribe(Collections.singletonList(this.topic));
    // Message consumption request
    ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
    // Message Processing
    for (ConsumerRecord<Integer, String> record : records) {
        LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}