Updated on 2025-03-25 GMT+08:00

KafkaProducer

Path

com.roma.apic.livedata.client.v1.KafkaProducer

Description

This class is used to produce Kafka messages.

Example

  • Security protocol is not enabled or the security protocol is PLAINTEXT:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, null)
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }
  • Security protocol is SASL_SSL and the SASL mechanism is PLAIN:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    //AppKey and AppSecret are the username and password for SASL_SSL authentication. If MQS is connected, the values are the key and secret of the integration application to which the topic belongs.
    var app_key = 'AppKey'
    var app_secret = 'AppSecret'
    
    function execute(data) {
        var config = KafkaConfig.getSaslConfig(kafka_brokers, null, app_key, app_secret)
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }
  • Security protocol is SASL_SSL or SASL_PLAINTEXT and the SASL mechanism is SCRAM-SHA-512:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, null);
       //Set the values according to the actually used  security protocol (SASL_SSL or SASL_PLAINTEXT).
        config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
        config.put("sasl.mechanism", "SCRAM-SHA-512"); 
    //AppKey and AppSecret are the username and password for SASL_SSL or SASL_PLAINTEXT authentication. If MQS is connected, the values are the key and secret of the integration application to which the topic belongs.
        config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
            
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }

Constructor Details

public KafkaProducer(Map configs)

Constructs a Kafka message producer.

Parameter: configs indicates configuration information of the Kafka.

Method List

Returned Type

Method and Description

org.apache.kafka.clients.producer.RecordMetadata

produce(String topic, String message)

Produce messages.

The produce(String topic, String message) method cannot be directly returned. Otherwise, the returned information is empty. For example, do not use the return record statement directly in the preceding example. Otherwise, the returned information is empty.

Method Details

public org.apache.kafka.clients.producer.RecordMetadata produce(String topic, String message)

Produce messages.

Input Parameter

  • topic indicates a message queue.
  • message indicates the message content.

Returns

Message record.