更新时间:2023-04-23 GMT+08:00

KafkaConsumer类说明

路径

com.roma.apic.livedata.client.v1.KafkaConsumer

说明

消费Kafka消息。

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
importClass(com.roma.apic.livedata.config.v1.KafkaConfig);

var kafka_brokers = '1.1.1.1:26330,2.2.2.2:26330'
var topic = 'YourKafkaTopic'
var group = 'YourKafkaGroupId'

function execute(data) {
    var config = KafkaConfig.getConfig(kafka_brokers, group)
    var consumer = new KafkaConsumer(config)
    var records = consumer.consume(topic, 5000, 10);
    var res = []
    var iter = records.iterator()
    while (iter.hasNext()) {
        res.push(iter.next())
    }
    return JSON.stringify(res);
}

构造器详情

public KafkaConsumer(Map configs)

构造一个Kafka消息消费者

参数:configs表示Kafka的配置信息

方法列表

返回类型

方法和说明

List<String>

consume(String topic, long timeout, long maxItems)

消费消息

方法详情

  • public List<String> consume(String topic, long timeout, long maxItems)

    消费消息

    输入参数

    • topic:消息队列
    • timeout:读取超时时间
    • maxItems:读取消息的最大数量

    返回信息

    Kafka已消费的消息数组,消息数组即将多条消息的内容组成一个数组