更新时间:2024-07-27 GMT+08:00
使用SpringBoot生产消费Kafka集群数据
本章节适用于MRS 3.3.0及之后版本。
功能简介
通过SpringBoot实现对Kafka集群生产消费的功能。
代码样例
通过SpringBoot实现Kafka生产消费的样例代码如下:
@RestController public class MessageController { private final static Logger LOG = LoggerFactory.getLogger(MessageController.class); @Autowired private KafkaProperties kafkaProperties; @GetMapping("/produce") public String produce() { Producer producerThread = new Producer(); producerThread.init(this.kafkaProperties); producerThread.start(); String message = "Start to produce messages"; LOG.info(message); return message; } @GetMapping("/consume") public String consume() { Consumer consumerThread = new Consumer(); consumerThread.init(this.kafkaProperties); consumerThread.start(); LOG.info("Start to consume messages"); // 等到180s后将consumer关闭,实际执行过程中可修改 try { Thread.sleep(consumerThread.getThreadAliveTime()); } catch (InterruptedException e) { LOG.info("Occurred InterruptedException: ", e); } finally { consumerThread.close(); } return String.format("Finished consume messages"); } }
Produce为生产消息接口,consume为消费消息接口,KafkaProperties为客户端参数,参数需要根据实际业务修改。
上述样例代码中的KafkaProperties属性可以在“springboot > kafka-examples > src > main > resources > application.properties”中配置,也可以在样例运行环境上手动编写application.properties文件。没有指定默认值的配置为必选项。
- bootstrap.servers:Kafka集群Broker地址列表,格式为ip:port,ip:port,ip:port。
- security.protocol:Kafka客户端使用的认证协议,默认值“SASL_PLAINTEXT”,不支持使用SASL_SSL协议。
- sasl.mechanism:客户端使用的认证机制,默认值“PLAIN”。
- manager_username:集群的用户。
- manager_password:集群用户对应的密码(密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全)。
- topic:生产消费的topic名称,默认值“example-metric1”。
- isAsync:是否使用异步生产,默认值“false”。
- consumer.alive.time:消费线程存活时间,默认值“180000”,单位ms。
- server.port:访问SpringBoot服务端的端口,默认值为“8080”,支持自定义修改。
- server.address: SpringBoot服务端启动时绑定的IP地址,默认值为“0.0.0.0”,需要修改为SpringBoot服务部署节点的IP地址。
- is.security.mode: 客户端是否使用安全模式连接集群,默认值为“true”。
父主题: 开发Kafka应用