更新时间:2025-03-25 GMT+08:00
分享

KafkaConsumer类说明

路径

com.roma.apic.livedata.client.v1.KafkaConsumer

说明

消费Kafka消息。

使用示例

  • 未启用安全协议或安全协议使用PLAINTEXT的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, group)
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }
  • 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    //AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
    var app_key = 'AppKey'
    var app_secret = 'AppSecret'
    
    function execute(data) {
        var config = KafkaConfig.getSaslConfig(kafka_brokers, group, app_key, app_secret)
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }
  • 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SCRAM-SHA-512的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, group)
        //根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT
        config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
        config.put("sasl.mechanism", "SCRAM-SHA-512"); 
        //AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret    
        config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
        
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }

构造器详情

public KafkaConsumer(Map configs)

构造一个Kafka消息消费者

参数:configs表示Kafka的配置信息

方法列表

返回类型

方法和说明

List<String>

consume(String topic, long timeout, long maxItems)

消费消息

方法详情

public List<String> consume(String topic, long timeout, long maxItems)

消费消息

输入参数

  • topic:消息队列
  • timeout:读取超时时间,最大值为30000毫秒,建议timeout设置值小于前端API的“后端超时”时间值
  • maxItems:读取消息的最大数量

返回信息

Kafka已消费的消息数组,消息数组即将多条消息的内容组成一个数组

相关文档