更新时间:2025-12-16 GMT+08:00
分享

SpringBoot使用Spring-Kafka模块访问Kafka样例代码

本章节适用于MRS 3.6.0及之后版本。

功能简介

Spring-Kafka是Spring生态系统中用于与Apache Kafka集成的模块,其提供了简化的API和配置方式,能帮助开发者更容易地在Spring应用中实现Kafka的生产者和消费者功能,能够显著降低Kafka的使用门槛。无论是简单的消息队列场景,还是复杂的事件驱动架构,Spring-Kafka都能提供灵活且高效的解决方案。

以下提供SpringBoot使用Spring-Kafka模块访问MRS Kafka的样例。

代码样例

提供SpringBoot使用Spring-Kafka模块访问MRS Kafka的样例代码如下:

Producer为生产消息接口,Consumer为消费消息接口,KafkaProperties为客户端参数。其中参数值需要根据实际业务修改。

  • Producer:
    @RestController
    public class MessageController {
        private final static Logger LOG = LoggerFactory.getLogger(MessageController.class);
        @Autowired
        private ProducerThread producerThread;
        @Value("${topic:example-metric1}")
        protected String topic;
        @GetMapping("/produce")
        public String produce() {
            String message = "Start to produce message";
            producerThread.start();
            LOG.info(message);
            return message;
        }
    }
  • Consumer:
    @Service
    public class ConsumerService {
      private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);
      @Value("${topic:example-metric1}")
      private String topic;
      @KafkaListener(
        containerFactory = "KafkaListenerContainerFactory",
        id = "id",
        idIsGroup = false,
        groupId = "groupid",
        topics = "${topic}"
      )
      public void listen(ConsumerRecord<?, ?> record) {
        LOG.info("The consumer poll 1 record from kafka, the topic is {}, the partition is {}, the offset is {}, the " +
          "key is {}, the value is {}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
    }
    Consumer配置介绍:
    • @KafkaListener:KafkaListener是Spring-Kafka提供的一个注解,用于在Spring应用中方便地消费Kafka消息。通过这个注解,开发者可以定义一个方法作为Kafka消息的监听器,当指定主题有消息时,该方法会自动被调用。总的来说,KafkaListener简化了Kafka消息消费的集成,使开发者能够专注于业务逻辑,而不必过多关注底层实现。
    • containerFactory:指定用于创建MessageListenerContainer的工厂bean的名称。
    • MessageListenerContainer:MessageListenerContainer是Spring-Kafka中的一个核心接口,用于管理和控制Kafka消息消费者的生命周期。它负责创建、启动、停止和管理Kafka消费者实例,并与Kafka服务器进行交互以消费消息,无论是单线程还是并发消费场景,都可以通过它来实现高效的消息处理。
    • id:每个Listener实例的唯一标识符。如不指定groupId,则“id”将直接作为groupId。在多监听器的应用中,可以使用不同的“id”来区分不同的监听器容器。
    • idIsGroup:此属性用来控制id属性与消费者组(groupId)之间的关系,用于指定id属性是否直接作为消费者组(groupId)使用,默认为“false”。
    • groupId:指定Kafka的消费者组的ID,每个消费者都有自己所属的组,一个组中可以有多个消费者,它们共同处理消息。
    • topics:生产消费的topic名称。
  • KafkaProperties:
    // KafkaProperties 
    @Configuration
    public class KafkaProperties {
        // Common Client Config
        @Value("${bootstrap.servers:}")
        private String bootstrapServers;
        @Value("${security.protocol:SASL_PLAINTEXT}")
        private String securityProtocol;
        @Value("${sasl.mechanism:PLAIN}")
        private String saslMechanism;
        @Value("${manager_username:}")
        private String username;
        @Value("${manager_password:}")
        private String password;
        @Value("${topic:example-metric1}")
        private String topic;
        @Value("${is.security.mode:true}")
        private boolean isSecurityMode;
        // producer config
        @Value("${isAsync:false}")
        private String isAsync;
        // consumer config
        @Value("${consumer.alive.time:180000}")
        private String consumerAliveTime;
        public KafkaProperties() {
        }
        /**
         * 生产者配置
         */
        @Bean(name = "kafkaProducerTemplate")
        public KafkaTemplate kafkaProducerTemplate() {
            Map<String, Object> props = new HashMap<>();
            this.initPropertiesByResources(props);
            this.initProducerProperties(props);
            return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
        }
        /**
         * 消费者配置
         */
        @Bean(name = "KafkaListenerContainerFactory")
        public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
            Map<String, Object> props = new HashMap<>();
            this.initConsumerProperties(props);
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
            // 设置并发数量
            factory.setConcurrency(3);
            return factory;
        }
        // 通过resources中的application.properties来设置部分参数
        public void initPropertiesByResources(Map<String, Object> properties) {
            // Broker连接地址
            if (isEmpty(this.bootstrapServers)) {
                throw new IllegalArgumentException("The bootstrap.servers is null or empty.");
            }
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
            // 安全协议类型
            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
            // 安全协议下使用的认证机制
            properties.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism);
            // 动态jaas config
            if (this.isSecurityMode) {
                if (isEmpty(this.username)|| isEmpty(this.password)) {
                    throw new IllegalArgumentException("The properties manager_username or manager_password is null or empty.");
                }
                String jaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;", this.username, this.password);
                properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
            }
            properties.put("topic", this.topic);
            properties.put("isAsync", this.isAsync);
            properties.put("consumer.alive.time", this.consumerAliveTime);
        }
        // 设置生产者的配置
        public void initProducerProperties(Map<String, Object> properties) {
            // 重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG, 3);
            // acks=0 把消息发送到Kafka就认为发送成功
            // acks=1 把消息发送到Kafka leader分区并且写入磁盘就认为发送成功
            // acks=all 把消息发送到Kafka leader分区并且leader分区的副本follower对消息进行了同步就认为任务发送成功
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            // KafkaProducer.send()和 partitionsFor()方法的最长阻塞时间,单位为ms
            properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
            // 批量处理的最大大小,单位为byte
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
            // 当生产端积累的消息达到batch-size或接收到消息linger.ms后,producer会将消息发送给Kafka
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
            // 生产者可用缓冲区的最大值,单位为byte
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            // 每条消息最大的大小
            properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
            // 客户端ID
            properties.put(ProducerConfig.CLIENT_ID_CONFIG, "client-1");
            // Key序列化方式
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // Value序列化方式
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 消息压缩 none、lz4、gzip、snappy
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        }
        // 设置消费者的配置
        private Map initConsumerProperties(Map<String, Object> properties) {
            this.initPropertiesByResources(properties);
            // 是否自动提交offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            // 自动提交offset的时间间隔
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
            // 批量消费最大数量
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
            // 消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
            // session超时,超过这个时间consumer没有发送心跳, 触发rebalance
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
            // 请求超时,单位为s
            properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
            // Key反序列化类
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // Value反序列化类
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            return properties;
        }
        public static boolean isEmpty(final CharSequence cs) {
            return cs == null || cs.length() == 0;
        }
    }
上述样例代码中的KafkaProperties属性可以在“springboot > kafka-examples > module-spring-kafka-examples > src > main > resources > application.properties”中配置,也可以在样例运行环境上手动编写application.properties文件。没有指定默认值的配置为必选项。
  • server.port:服务器监听的端口。
  • server.address:服务器的IP地址。
  • bootstrap.servers:Kafka集群Broker地址列表,格式为ip:port,ip:port,ip:port。IPv6环境下,需要给IP地址添加 [],例如:[::1]:21007。
  • security.protocol:Kafka客户端使用的认证协议,默认值“PLAINTEXT”。
  • sasl.mechanism:客户端使用的认证机制,默认值“PLAIN”。
  • topic:生产消费的topic名称,默认值“example-metric1”。
  • isAsync:是否使用异步生产,默认值“false”。
  • consumer.alive.time:消费线程存活时间,默认值“180000”,单位ms。
  • is.security.mode: 客户端是否使用安全模式连接集群,默认值为“false”

相关文档