Using spring-kafka

This section describes how to use spring-kafka to connect to a Kafka instance to produce and consume messages. Obtain the related code from kafka-springboot-demo.

The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.

Adding the spring-kafka Dependency to the pom.xml File

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Configuring the application.properties File

#=============== Kafka ==========================
## Broker information of the Kafka instance. ip:port indicates the connection address and port number of the instance.
spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3
#=============== Producer Configuration =======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== Consumer Configuration =======================
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#======== SASL Configuration (Delete the following configuration if SASL is disabled.) =======
## SASL authentication
spring.kafka.properties.sasl.mechanism=PLAIN
## Encryption protocol. Currently, SASL_SSL is supported.
spring.kafka.security.protocol=SASL_SSL
## Set the JAAS account and password. The username and password are those you set when enabling SASL_SSL during Kafka instance creation or when creating a SASL_SSL user.
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="user" \
    password="password";
## Password of ssl.truststore. Do not change the value.
spring.kafka.ssl.trust-store-password=dms@kafka
## Location of ssl.truststore
spring.kafka.ssl.trust-store-location=file:D:\\temp\\client.jks
## Indicates whether to verify the certificate domain name. If this parameter is not set, the certificate domain name is not verified.
spring.kafka.properties.ssl.endpoint.identification.algorithm=

Producing Messages

package com.huaweicloud.dms.example.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @author huaweicloud DMS
 */
@Component
public class DmsKafkaProducer {
    /**
     * Topic name. Use the actual topic name.
     */
    public static final String TOPIC = "test_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * One message is produced every five seconds as scheduled.
     */
    @Scheduled(cron = "*/5 * * * * ?")
    public void send() {
        String message = String.format("{id:%s,timestamp:%s}", UUID.randomUUID().toString(), System.currentTimeMillis());
        kafkaTemplate.send(TOPIC, message);
        System.out.println("send finished, message = " + message);
    }
}

Consuming Messages

package com.huaweicloud.dms.example.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author huaweicloud DMS
 */
@Component
public class DmsKafkaConsumer {
    /**
     * Topic name. Use the actual topic name.
     */
    private static final String TOPIC = "test_topic";

    @KafkaListener(topics = {TOPIC})
    public void listen(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println("consume finished, message = " + message.get());
        }
    }
}