KafkaProducer类说明
路径
com.roma.apic.livedata.client.v1.KafkaProducer
说明
生产Kafka消息。
使用示例
- 未启用安全协议或安全协议使用PLAINTEXT的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaProducer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' function execute(data) { var config = KafkaConfig.getConfig(kafka_brokers, null) var producer = new KafkaProducer(config) var record = producer.produce(topic, "hello, kafka.") return { offset: record.offset(), partition: record.partition(), code: 0, message: "OK" } }
- 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaProducer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' //AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret var app_key = 'AppKey' var app_secret = 'AppSecret' function execute(data) { var config = KafkaConfig.getSaslConfig(kafka_brokers, null, app_key, app_secret) var producer = new KafkaProducer(config) var record = producer.produce(topic, "hello, kafka.") return { offset: record.offset(), partition: record.partition(), code: 0, message: "OK" } }
- 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SCRAM-SHA-512的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaProducer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' function execute(data) { var config = KafkaConfig.getConfig(kafka_brokers, null); //根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT"); config.put("sasl.mechanism", "SCRAM-SHA-512"); //AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";"); var producer = new KafkaProducer(config) var record = producer.produce(topic, "hello, kafka.") return { offset: record.offset(), partition: record.partition(), code: 0, message: "OK" } }
构造器详情
public KafkaProducer(Map configs)
构造一个Kafka消息生产者
参数:configs表示Kafka的配置信息
方法列表
返回类型 |
方法和说明 |
---|---|
org.apache.kafka.clients.producer.RecordMetadata |
produce(String topic, String message) 生产消息 |

不能直接返回方法produce(String topic, String message),否则会导致返回信息为空。例如在使用示例中,不能直接使用“return record”句式,否则返回的信息为空。