Using spring-kafka
This section describes how to use spring-kafka to connect to a Huawei Cloud Kafka instance to produce and consume messages. Obtain the related code from kafka-springboot-demo.
The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.
Adding the spring-kafka Dependency to the pom.xml File
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Configuring the File
#=============== Kafka ========================== ## Broker information of the Kafka instance. ip:port indicates the connection address and port number of the instance. spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== Producer Configuration ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== Consumer Configuration ======================= spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL Configuration (Delete the following configuration if SASL is disabled.) ======= ## Set the SASL authentication mechanism, username, and password. # indicates the SASL authentication mechanism. username and password indicate the username and password of SASL_SSL. Obtain them by referring to "Collecting Connection Information." ## If the SASL mechanism is PLAIN, the configuration is as follows: required \ username="username" \ password="password"; ## If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows: required \ username="username" \ password="password"; # Set ## If the security protocol is SASL_SSL, the configuration is as follows: # is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.\\temp\\client.truststore.jks # is the password of the server certificate. This password does not need to be modified. It is used for accessing the JKS file generated by Java. # indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
Producing Messages
package com.huaweicloud.dms.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author huaweicloud DMS */ @Component public class DmsKafkaProducer { /** * Topic name. Use the actual topic name. */ public static final String TOPIC = "test_topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * One message is produced every five seconds as scheduled. */ @Scheduled(cron = "*/5 * * * * ?") public void send() { String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis()); kafkaTemplate.send(TOPIC, message); System.out.println("send finished, message = " + message); } }
Consuming Messages
package com.huaweicloud.dms.example.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @author huaweicloud DMS */ @Component public class DmsKafkaConsumer { /** * Topic name. Use the actual topic name. */ private static final String TOPIC = "test_topic"; @KafkaListener(topics = {TOPIC}) public void listen(ConsumerRecord<String, String> record) { Optional<String> message = Optional.ofNullable(record.value()); if (message.isPresent()) { System.out.println("consume finished, message = " + message.get()); } } }
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.