更新时间:2023-08-03 GMT+08:00
Consumer API样例
功能简介
下面代码片段在com.huawei.bigdata.kafka.example.Consumer类中,用于实现使用Consumer API订阅安全Topic,并进行消息消费。
代码样例
/** * Consumer构造函数。 * @param topic 订阅的Topic名称。 */ public Consumer(String topic) { super("KafkaConsumerExample", false); // 初始化consumer启动所需的配置参数,详见代码。 Properties props = initProperties(); consumer = new KafkaConsumer<Integer, String>(props); this.topic = topic; } public void doWork() { // 订阅 consumer.subscribe(Collections.singletonList(this.topic)); // 消息消费请求 ConsumerRecords<Integer, String> records = consumer.poll(waitTime); // 消息处理 for (ConsumerRecord<Integer, String> record : records) { LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } }
父主题: 典型场景样例代码说明