更新时间:2023-05-17 GMT+08:00
分享

KafkaConsumer类说明

路径

com.roma.apic.livedata.client.v1.KafkaConsumer

说明

消费Kafka消息。

使用示例

importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);

var kafka_brokers = '1.1.1.1:26330,2.2.2.2:26330'
var topic = 'YourKafkaTopic'
var group = 'YourKafkaGroupId'

function execute(data) {
    var config = KafkaConfig.getConfig(kafka_brokers, group)
    var consumer = new KafkaConsumer(config)
    var records = consumer.consume(topic, 5000, 10);
    var res = []
    var iter = records.iterator()
    while (iter.hasNext()) {
        res.push(iter.next())
    }
    return JSON.stringify(res);
}

构造器详情

public KafkaConsumer(Map configs)

构造一个Kafka消息消费者

参数:configs表示Kafka的配置信息

方法列表

返回类型

方法和说明

List<String>

consume(String topic, long timeout, long maxItems)

消费消息

方法详情

public List<String> consume(String topic, long timeout, long maxItems)

消费消息

输入参数

  • topic:消息队列
  • timeout:读取超时时间,最大值为30000毫秒,建议timeout设置值小于前端API的“后端超时”时间值
  • maxItems:读取消息的最大数量

返回信息

Kafka已消费的消息数组,消息数组即将多条消息的内容组成一个数组

相关文档