Updated on 2023-11-29 GMT+08:00

Using spring-kafka

This section describes how to use spring-kafka to connect to a Huawei Cloud Kafka instance to produce and consume messages. Obtain the related code from kafka-springboot-demo.

The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.

Adding the spring-kafka Dependency to the pom.xml File

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Configuring the application.properties File

#=============== Kafka ==========================
## Broker information of the Kafka instance. ip:port indicates the connection address and port number of the instance.
spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3
#=============== Producer Configuration =======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== Consumer Configuration =======================
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#======== SASL Configuration (Delete the following configuration if SASL is disabled.) =======
## Set the SASL authentication mechanism, username, and password.
# spring.kafka.properties.sasl.mechanism indicates the SASL authentication mechanism. username and password indicate the username and password of SASL_SSL. Obtain them by referring to "Collecting Connection Information."
## If the SASL mechanism is PLAIN, the configuration is as follows:
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="username" \
    password="password";
## If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="username" \
    password="password";

# Set spring.kafka.security.protocol.
## If the security protocol is SASL_SSL, the configuration is as follows:
spring.kafka.security.protocol=SASL_SSL
# spring.kafka.ssl.trust-store-location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
spring.kafka.ssl.trust-store-location=E:\\temp\\client.truststore.jks
# spring.kafka.ssl.trust-store-password is the password of the server certificate. This password does not need to be modified. It is used for accessing the JKS file generated by Java.
spring.kafka.ssl.trust-store-password=dms@kafka
# spring.kafka.properties.ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
spring.kafka.properties.ssl.endpoint.identification.algorithm=
## If the security protocol is SASL_PLAINTEXT, the configuration is as follows:
spring.kafka.security.protocol=SASL_PLAINTEXT

Producing Messages

package com.huaweicloud.dms.example.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @author huaweicloud DMS
 */
@Component
public class DmsKafkaProducer {
    /**
     * Topic name. Use the actual topic name.
     */
    public static final String TOPIC = "test_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * One message is produced every five seconds as scheduled.
     */
    @Scheduled(cron = "*/5 * * * * ?")
    public void send() {
        String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis());
        kafkaTemplate.send(TOPIC, message);
        System.out.println("send finished, message = " + message);
    }
}

Consuming Messages

package com.huaweicloud.dms.example.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author huaweicloud DMS
 */
@Component
public class DmsKafkaConsumer {
    /**
     * Topic name. Use the actual topic name.
     */
    private static final String TOPIC = "test_topic";

    @KafkaListener(topics = {TOPIC})
    public void listen(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println("consume finished, message = " + message.get());
        }
    }
}