Updated on 2025-03-25 GMT+08:00

KafkaConsumer

Path

com.roma.apic.livedata.client.v1.KafkaConsumer

Description

This class is used to consume Kafka messages.

Example

  • Security protocol is not enabled or the security protocol is PLAINTEXT:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, group)
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }
  • Security protocol is SASL_SSL and the SASL mechanism is PLAIN:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    //AppKey and AppSecret are the username and password for SASL_SSL authentication. If MQS is connected, the values are the key and secret of the integration application to which the topic belongs.
    var app_key = 'AppKey'
    var app_secret = 'AppSecret'
    
    function execute(data) {
        var config = KafkaConfig.getSaslConfig(kafka_brokers, group, app_key, app_secret)
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }
  • Security protocol is SASL_SSL or SASL_PLAINTEXT and the SASL mechanism is SCRAM-SHA-512:
    importClass(com.roma.apic.livedata.client.v1.KafkaConsumer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    var group = 'YourKafkaGroupId'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, group)
       //Set the values according to the actually used  security protocol (SASL_SSL or SASL_PLAINTEXT).
        config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
        config.put("sasl.mechanism", "SCRAM-SHA-512"); 
    //AppKey and AppSecret are the username and password for SASL_SSL or SASL_PLAINTEXT authentication. If MQS is connected, the values are the key and secret of the integration application to which the topic belongs.
        config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
        
        var consumer = new KafkaConsumer(config)
        var records = consumer.consume(topic, 2000, 10);
        var res = []
        var iter = records.iterator()
        while (iter.hasNext()) {
            res.push(iter.next())
        }
        return JSON.stringify(res);
    }

Constructor Details

public KafkaConsumer(Map configs)

Constructs a Kafka message consumer.

Parameter: configs indicates configuration information of the Kafka.

Method List

Returned Type

Method and Description

List<String>

consume(String topic, long timeout, long maxItems)

Consume messages.

Method Details

public List<String> consume(String topic, long timeout, long maxItems)

Consume messages.

Input Parameter

  • topic indicates a message queue.
  • timeout: indicates the read timeout interval (maximum value: 30,000 ms). Set this parameter to a value less than the backend timeout of the frontend API.
  • maxItems indicates the maximum number of messages that can be read.

Returns

Message array that has been consumed by Kafka. The content of multiple messages forms an array.