分布式消息服务Kafka版
分布式消息服务Kafka版
- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 如何修改安全协议?
- 修改企业项目,是否会导致Kafka重启?
- 100MB/s的带宽怎样开启公网访问?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka包周期实例支持删除吗?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
- Kafka体验版使用说明
-
实例问题
- 故障排除
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
spring-kafka的使用
本文介绍如何使用spring-kafka连接华为云Kafka实例进行消息的生产和消费。相关代码您可以从kafka-springboot-demo中获取。
下文所有Kafka的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
在pom.xml文件中引入spring-kafka依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
在application.properties文件中填写配置
#=============== Kafka ========================== ## Kafka实例的broker信息,ip:port为实例的连接地址和端口 spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== 生产者配置 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== 消费者配置 ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL配置(不开启SASL时将以下配置删除) ======= ## 设置SASL认证机制、账号和密码。 ## spring.kafka.properties.sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 ## SASL认证机制为“PLAIN”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; ## SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; ## 设置Kafka安全协议。spring.kafka.security.protocol为安全协议。 ## 安全协议为“SASL_SSL”时,配置信息如下。 spring.kafka.security.protocol=SASL_SSL ## spring.kafka.ssl.trust-store-location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks ## spring.kafka.ssl.trust-store-password为服务器证书密码,无需修改,配置此密码是为了访问Java生成的jks文件。 spring.kafka.ssl.trust-store-password=dms@kafka ## spring.kafka.properties.ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 spring.kafka.properties.ssl.endpoint.identification.algorithm= ## 安全协议为“SASL_PLAINTEXT”时,配置信息如下。 spring.kafka.security.protocol=SASL_PLAINTEXT
生产消息
package com.huaweicloud.dms.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author huaweicloud DMS */ @Component public class DmsKafkaProducer { /** * Topic名称,根据实际情况修改 */ public static final String TOPIC = "test_topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 定时任务每5秒钟生产一条消息 */ @Scheduled(cron = "*/5 * * * * ?") public void send() { String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis()); kafkaTemplate.send(TOPIC, message); System.out.println("send finished, message = " + message); } }
消费消息
package com.huaweicloud.dms.example.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @author huaweicloud DMS */ @Component public class DmsKafkaConsumer { /** * Topic名称,根据实际情况修改 */ private static final String TOPIC = "test_topic"; @KafkaListener(topics = {TOPIC}) public void listen(ConsumerRecord<String, String> record) { Optional<String> message = Optional.ofNullable(record.value()); if (message.isPresent()) { System.out.println("consume finished, message = " + message.get()); } } }