Using spring-kafka
This section describes how to use spring-kafka to connect to a Huawei Cloud Kafka instance to produce and consume messages. Obtain the related code from kafka-springboot-demo.
The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.
Adding the spring-kafka Dependency to the pom.xml File
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Configuring the application.properties File
#=============== Kafka ========================== ## Broker information of the Kafka instance. ip:port indicates the connection address and port number of the instance. spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== Producer Configuration ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== Consumer Configuration ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL Configuration (Delete the following configuration if SASL is disabled.) ======= ## Set the SASL authentication mechanism, username, and password. #spring.kafka.properties.sasl.mechanism is the SASL mechanism. username and password are the SASL username and password. Obtain them by referring to section "Collecting Connection Information". For security purposes, you are advised to encrypt the username and password. ## If the SASL mechanism is PLAIN, the configuration is as follows: spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; ## If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows: spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; # Set spring.kafka.security.protocol. ## If the security protocol is SASL_SSL, the configuration is as follows: spring.kafka.security.protocol=SASL_SSL # spring.kafka.ssl.trust-store-location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment. spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks # spring.kafka.ssl.trust-store-password is the password of the server certificate. This password does not need to be modified. It is used for accessing the JKS file generated by Java. spring.kafka.ssl.trust-store-password=dms@kafka # spring.kafka.properties.ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification. spring.kafka.properties.ssl.endpoint.identification.algorithm= ## If the security protocol is SASL_PLAINTEXT, the configuration is as follows: spring.kafka.security.protocol=SASL_PLAINTEXT
Producing Messages
package com.huaweicloud.dms.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author huaweicloud DMS */ @Component public class DmsKafkaProducer { /** * Topic name. Use the actual topic name. */ public static final String TOPIC = "test_topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * One message is produced every five seconds as scheduled. */ @Scheduled(cron = "*/5 * * * * ?") public void send() { String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis()); kafkaTemplate.send(TOPIC, message); System.out.println("send finished, message = " + message); } }
Consuming Messages
package com.huaweicloud.dms.example.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @author huaweicloud DMS */ @Component public class DmsKafkaConsumer { /** * Topic name. Use the actual topic name. */ private static final String TOPIC = "test_topic"; @KafkaListener(topics = {TOPIC}) public void listen(ConsumerRecord<String, String> record) { Optional<String> message = Optional.ofNullable(record.value()); if (message.isPresent()) { System.out.println("consume finished, message = " + message.get()); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot