更新时间:2025-05-28 GMT+08:00
分享

KafkaProducer类说明

路径

com.roma.apic.livedata.client.v1.KafkaProducer

说明

生产Kafka消息。

使用示例

  • 未启用安全协议或安全协议使用PLAINTEXT的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, null)
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }
  • 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    //AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret
    var app_key = 'AppKey'
    var app_secret = 'AppSecret'
    
    function execute(data) {
        var config = KafkaConfig.getSaslConfig(kafka_brokers, null, app_key, app_secret)
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }
  • 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SCRAM-SHA-512的场景:
    importClass(com.roma.apic.livedata.client.v1.KafkaProducer);
    importClass(com.roma.apic.livedata.config.v1.KafkaConfig);
    
    var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330'
    var topic = 'YourKafkaTopic'
    
    function execute(data) {
        var config = KafkaConfig.getConfig(kafka_brokers, null);
        //根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT
        config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT");
        config.put("sasl.mechanism", "SCRAM-SHA-512"); 
        //AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret    
        config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";");
            
        var producer = new KafkaProducer(config)
        var record = producer.produce(topic, "hello, kafka.")
        return {
            offset: record.offset(),
            partition: record.partition(),
            code: 0,
            message: "OK"
        }
    }

构造器详情

public KafkaProducer(Map configs)

构造一个Kafka消息生产者

参数:configs表示Kafka的配置信息

方法列表

返回类型

方法和说明

org.apache.kafka.clients.producer.RecordMetadata

produce(String topic, String message)

生产消息

不能直接返回方法produce(String topic, String message),否则会导致返回信息为空。例如在使用示例中,不能直接使用“return record”句式,否则返回的信息为空。

方法详情

public org.apache.kafka.clients.producer.RecordMetadata produce(String topic, String message)

生产消息

输入参数

  • topic:消息队列
  • message:消息内容

返回信息

消息记录

相关文档