更新时间:2025-09-11 GMT+08:00
spring-kafka的使用
本文介绍如何使用spring-kafka连接华为云Kafka实例进行消息的生产和消费。相关代码您可以从kafka-springboot-demo中获取。
下文所有Kafka的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
在pom.xml文件中引入spring-kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在application.properties文件中填写配置
#=============== Kafka ==========================
## Kafka实例的broker信息,ip:port为实例的连接地址和端口
spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3
#=============== 生产者配置 =======================
spring.kafka.producer.retries=10
spring.kafka.producer.retry.backoff.ms=1000
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== 消费者配置 =======================
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#======== SASL配置(不开启SASL时将以下配置删除) =======
## 设置SASL认证机制、账号和密码。
## spring.kafka.properties.sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
## SASL认证机制为“PLAIN”时,配置信息如下。
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
## SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="username" \
password="password";
## 设置Kafka安全协议。spring.kafka.security.protocol为安全协议。
## 安全协议为“SASL_SSL”时,配置信息如下。
spring.kafka.security.protocol=SASL_SSL
## spring.kafka.ssl.trust-store-location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks
## spring.kafka.ssl.trust-store-password为Kafka客户端证书密码,如果使用Kafka控制台提供的SSL证书,默认为dms@kafka,不可更改。如果使用您自制的客户端证书,请根据实际情况配置。配置此密码是为了访问Java生成的jks文件。
spring.kafka.ssl.trust-store-password=dms@kafka
## spring.kafka.properties.ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
spring.kafka.properties.ssl.endpoint.identification.algorithm=
## 安全协议为“SASL_PLAINTEXT”时,配置信息如下。
spring.kafka.security.protocol=SASL_PLAINTEXT
生产消息
package com.huaweicloud.dms.example.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author huaweicloud DMS
*/
@Component
public class DmsKafkaProducer {
/**
* Topic名称,根据实际情况修改
*/
public static final String TOPIC = "test_topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 定时任务每5秒钟生产一条消息
*/
@Scheduled(cron = "*/5 * * * * ?")
public void send() {
String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis());
kafkaTemplate.send(TOPIC, message);
System.out.println("send finished, message = " + message);
}
}
消费消息
package com.huaweicloud.dms.example.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author huaweicloud DMS
*/
@Component
public class DmsKafkaConsumer {
/**
* Topic名称,根据实际情况修改
*/
private static final String TOPIC = "test_topic";
@KafkaListener(topics = {TOPIC})
public void listen(ConsumerRecord<String, String> record) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
System.out.println("consume finished, message = " + message.get());
}
}
}