Consumer API Usage Sample
Function Description
The following code sample belongs to the com.huawei.bigdata.kafka.example.Consumer class. It is used to enable the Consumer API to subscribe a secure topic and consume messages.
Code Sample
/**
* Consumer constructor.
* @param topic Name of the subscribed topic
*/
public Consumer(String topic) {
super("KafkaConsumerExample", false);
// Initializes the configuration parameters required for starting the consumer. For details, see the code.
Properties props = initProperties();
consumer = new KafkaConsumer<Integer, String>(props);
this.topic = topic;
}
public void doWork() {
// Subscribe
consumer.subscribe(Collections.singletonList(this.topic));
// Message consumption request
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// Message Processing
for (ConsumerRecord<Integer, String> record : records) {
LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.