Consumer API Sample
Function
The following code snippet belongs to the com.huawei.bigdata.kafka.example.Consumer class. It is used to enable the Consumer APIs to subscribe to secure topics and consume messages.
Sample Code
/**
* Consumer create a function.
* @param topic Subscribed topic name
*/
public Consumer(String topic) {
super("KafkaConsumerExample", false);
// Initialize the configuration parameters required for starting the consumer. For details, see the code.
Properties props = initProperties();
consumer = new KafkaConsumer<Integer, String>(props);
this.topic = topic;
}
public void doWork() {
// Subscribe.
consumer.subscribe(Collections.singletonList(this.topic));
// Submit the message consumption request.
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// Process the message.
for (ConsumerRecord<Integer, String> record : records) {
LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.