KafkaConsumer类说明
路径
com.roma.apic.livedata.client.v1.KafkaConsumer
说明
消费Kafka消息。
使用示例
- 未启用安全协议或安全协议使用PLAINTEXT的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaConsumer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' var group = 'YourKafkaGroupId' function execute(data) { var config = KafkaConfig.getConfig(kafka_brokers, group) var consumer = new KafkaConsumer(config) var records = consumer.consume(topic, 2000, 10); var res = [] var iter = records.iterator() while (iter.hasNext()) { res.push(iter.next()) } return JSON.stringify(res); }
- 安全协议使用SASL_SSL且SASL认证机制使用PLAIN的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaConsumer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' var group = 'YourKafkaGroupId' //AppKey和AppSecret为SASL_SSL认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret var app_key = 'AppKey' var app_secret = 'AppSecret' function execute(data) { var config = KafkaConfig.getSaslConfig(kafka_brokers, group, app_key, app_secret) var consumer = new KafkaConsumer(config) var records = consumer.consume(topic, 2000, 10); var res = [] var iter = records.iterator() while (iter.hasNext()) { res.push(iter.next()) } return JSON.stringify(res); }
- 安全协议使用SASL_SSL或SASL_PLAINTEXT,且SASL认证机制使用SCRAM-SHA-512的场景:
importClass(com.roma.apic.livedata.client.v1.KafkaConsumer); importClass(com.roma.apic.livedata.config.v1.KafkaConfig); var kafka_brokers = '192.168.0.10:26330,192.168.0.11:26330,192.168.0.12:26330' var topic = 'YourKafkaTopic' var group = 'YourKafkaGroupId' function execute(data) { var config = KafkaConfig.getConfig(kafka_brokers, group) //根据实际使用的安全协议设置具体的值,SASL_SSL或SASL_PLAINTEXT config.put("security.protocol", "SASL_SSL|SASL_PLAINTEXT"); config.put("sasl.mechanism", "SCRAM-SHA-512"); //AppKey和AppSecret为SASL_SSL或SASL_PLAINTEXT认证的用户名和密码,如果对接的为MQS,则为Topic所属集成应用的Key和Secret config.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AppKey\" password=\"AppSecret\";"); var consumer = new KafkaConsumer(config) var records = consumer.consume(topic, 2000, 10); var res = [] var iter = records.iterator() while (iter.hasNext()) { res.push(iter.next()) } return JSON.stringify(res); }
构造器详情
public KafkaConsumer(Map configs)
构造一个Kafka消息消费者
参数:configs表示Kafka的配置信息
方法列表
返回类型 |
方法和说明 |
---|---|
List<String> |
consume(String topic, long timeout, long maxItems) 消费消息 |