Updated on 2023-08-31 GMT+08:00

Consumer API Sample

Function

The following code snippet belongs to the com.huawei.bigdata.kafka.example.Consumer class. It is used to enable the Consumer APIs to subscribe to secure topics and consume messages.

Sample Code

/**     
  * Consumer create a function.
  * @param topic Subscribed topic name     
  */

public Consumer(String topic) {
    super("KafkaConsumerExample", false);
    // Initialize the configuration parameters required for starting the consumer. For details, see the code.
    Properties props = initProperties();
    consumer = new KafkaConsumer<Integer, String>(props);
    this.topic = topic;
}

public void doWork() {
    // Subscribe.
    consumer.subscribe(Collections.singletonList(this.topic));
    // Submit the message consumption request.
    ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
    // Process the message.
    for (ConsumerRecord<Integer, String> record : records) {
        LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}