更新时间:2025-04-11 GMT+08:00
分享

Kafka对接SpringBoot样例代码

本章节适用于MRS 3.3.0及之后版本。

功能简介

通过SpringBoot实现对Kafka集群生产消费的功能。

代码样例

通过SpringBoot实现Kafka生产消费的样例代码如下:

@RestController
public class MessageController {
    private final static Logger LOG = LoggerFactory.getLogger(MessageController.class);
    @Autowired
    private KafkaProperties kafkaProperties;
    @GetMapping("/produce")
    public String produce() {
        Producer producerThread = new Producer();
        producerThread.init(this.kafkaProperties);
        producerThread.start();
        String message = "Start to produce messages";
        LOG.info(message);
        return message;
    }
    @GetMapping("/consume")
    public String consume() {
        Consumer consumerThread = new Consumer();
        consumerThread.init(this.kafkaProperties);
        consumerThread.start();
        LOG.info("Start to consume messages");
        // 等到180s后将consumer关闭,实际执行过程中可修改
        try {
            Thread.sleep(consumerThread.getThreadAliveTime());
        } catch (InterruptedException e) {
            LOG.info("Occurred InterruptedException: ", e);
        } finally {
            consumerThread.close();
        }
        return String.format("Finished consume messages");
    }
}

Producer为生产消息接口,Consumer为消费消息接口,KafkaProperties为客户端参数,参数需要根据实际业务修改。

上述样例代码中的KafkaProperties属性可以在样例代码工程“springboot > kafka-examples > src > main > resources > application.properties”中配置,也可以在样例运行环境上手动编写application.properties文件,没有指定默认值的配置为必选项。
  • bootstrap.servers:Kafka集群Broker地址列表,格式为ip:port,ip:port,ip:port。IPv6环境下,需要给IP地址添加 [],例如:[::1]:21007。
  • security.protocol:Kafka客户端使用的认证协议,默认值“PLAINTEXT”。
  • topic:生产消费的topic名称,默认值“example-metric1”。
  • isAsync:是否使用异步生产,默认值“false”。
  • consumer.alive.time:消费线程存活时间,默认值“180000”单位ms。
  • server.port:访问SpringBoot服务端的端口,默认值为“8080”,支持自定义修改。
  • server.address:SpringBoot服务端启动时绑定的IP地址,默认值为“0.0.0.0”,需要修改为SpringBoot服务部署节点的IP地址。
  • is.security.mode: 客户端是否使用安全模式连接集群,默认值为“false”。

相关文档