SpringBoot使用Spring-Kafka模块访问Kafka样例代码
本章节适用于MRS 3.6.0及之后版本。
功能简介
Spring-Kafka是Spring生态系统中用于与Apache Kafka集成的模块,其提供了简化的API和配置方式,能帮助开发者更容易地在Spring应用中实现Kafka的生产者和消费者功能,能够显著降低Kafka的使用门槛。无论是简单的消息队列场景,还是复杂的事件驱动架构,Spring-Kafka都能提供灵活且高效的解决方案。
以下提供SpringBoot使用Spring-Kafka模块访问MRS Kafka的样例。
代码样例
提供SpringBoot使用Spring-Kafka模块访问MRS Kafka的样例代码如下:
Producer为生产消息接口,Consumer为消费消息接口,KafkaProperties为客户端参数。其中参数值需要根据实际业务修改。
- Producer:
@RestController public class MessageController { private final static Logger LOG = LoggerFactory.getLogger(MessageController.class); @Autowired private ProducerThread producerThread; @Value("${topic:example-metric1}") protected String topic; @GetMapping("/produce") public String produce() { String message = "Start to produce message"; producerThread.start(); LOG.info(message); return message; } }
- Consumer:
@Service public class ConsumerService { private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class); @Value("${topic:example-metric1}") private String topic; @KafkaListener( containerFactory = "KafkaListenerContainerFactory", id = "id", idIsGroup = false, groupId = "groupid", topics = "${topic}" ) public void listen(ConsumerRecord<?, ?> record) { LOG.info("The consumer poll 1 record from kafka, the topic is {}, the partition is {}, the offset is {}, the " + "key is {}, the value is {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } }
Consumer配置介绍:- @KafkaListener:KafkaListener是Spring-Kafka提供的一个注解,用于在Spring应用中方便地消费Kafka消息。通过这个注解,开发者可以定义一个方法作为Kafka消息的监听器,当指定主题有消息时,该方法会自动被调用。总的来说,KafkaListener简化了Kafka消息消费的集成,使开发者能够专注于业务逻辑,而不必过多关注底层实现。
- containerFactory:指定用于创建MessageListenerContainer的工厂bean的名称。
- MessageListenerContainer:MessageListenerContainer是Spring-Kafka中的一个核心接口,用于管理和控制Kafka消息消费者的生命周期。它负责创建、启动、停止和管理Kafka消费者实例,并与Kafka服务器进行交互以消费消息,无论是单线程还是并发消费场景,都可以通过它来实现高效的消息处理。
- id:每个Listener实例的唯一标识符。如不指定groupId,则“id”将直接作为groupId。在多监听器的应用中,可以使用不同的“id”来区分不同的监听器容器。
- idIsGroup:此属性用来控制id属性与消费者组(groupId)之间的关系,用于指定id属性是否直接作为消费者组(groupId)使用,默认为“false”。
- groupId:指定Kafka的消费者组的ID,每个消费者都有自己所属的组,一个组中可以有多个消费者,它们共同处理消息。
- topics:生产消费的topic名称。
- KafkaProperties:
// KafkaProperties @Configuration public class KafkaProperties { // Common Client Config @Value("${bootstrap.servers:}") private String bootstrapServers; @Value("${security.protocol:SASL_PLAINTEXT}") private String securityProtocol; @Value("${sasl.mechanism:PLAIN}") private String saslMechanism; @Value("${manager_username:}") private String username; @Value("${manager_password:}") private String password; @Value("${topic:example-metric1}") private String topic; @Value("${is.security.mode:true}") private boolean isSecurityMode; // producer config @Value("${isAsync:false}") private String isAsync; // consumer config @Value("${consumer.alive.time:180000}") private String consumerAliveTime; public KafkaProperties() { } /** * 生产者配置 */ @Bean(name = "kafkaProducerTemplate") public KafkaTemplate kafkaProducerTemplate() { Map<String, Object> props = new HashMap<>(); this.initPropertiesByResources(props); this.initProducerProperties(props); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } /** * 消费者配置 */ @Bean(name = "KafkaListenerContainerFactory") public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); Map<String, Object> props = new HashMap<>(); this.initConsumerProperties(props); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); // 设置并发数量 factory.setConcurrency(3); return factory; } // 通过resources中的application.properties来设置部分参数 public void initPropertiesByResources(Map<String, Object> properties) { // Broker连接地址 if (isEmpty(this.bootstrapServers)) { throw new IllegalArgumentException("The bootstrap.servers is null or empty."); } properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); // 安全协议类型 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol); // 安全协议下使用的认证机制 properties.put(SaslConfigs.SASL_MECHANISM, this.saslMechanism); // 动态jaas config if (this.isSecurityMode) { if (isEmpty(this.username)|| isEmpty(this.password)) { throw new IllegalArgumentException("The properties manager_username or manager_password is null or empty."); } String jaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;", this.username, this.password); properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); } properties.put("topic", this.topic); properties.put("isAsync", this.isAsync); properties.put("consumer.alive.time", this.consumerAliveTime); } // 设置生产者的配置 public void initProducerProperties(Map<String, Object> properties) { // 重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 3); // acks=0 把消息发送到Kafka就认为发送成功 // acks=1 把消息发送到Kafka leader分区并且写入磁盘就认为发送成功 // acks=all 把消息发送到Kafka leader分区并且leader分区的副本follower对消息进行了同步就认为任务发送成功 properties.put(ProducerConfig.ACKS_CONFIG, "all"); // KafkaProducer.send()和 partitionsFor()方法的最长阻塞时间,单位为ms properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // 批量处理的最大大小,单位为byte properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 当生产端积累的消息达到batch-size或接收到消息linger.ms后,producer会将消息发送给Kafka properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000); // 生产者可用缓冲区的最大值,单位为byte properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 每条消息最大的大小 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576); // 客户端ID properties.put(ProducerConfig.CLIENT_ID_CONFIG, "client-1"); // Key序列化方式 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Value序列化方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息压缩 none、lz4、gzip、snappy properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); } // 设置消费者的配置 private Map initConsumerProperties(Map<String, Object> properties) { this.initPropertiesByResources(properties); // 是否自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交offset的时间间隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 批量消费最大数量 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); // session超时,超过这个时间consumer没有发送心跳, 触发rebalance properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); // 请求超时,单位为s properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000); // Key反序列化类 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value反序列化类 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return properties; } public static boolean isEmpty(final CharSequence cs) { return cs == null || cs.length() == 0; } }
- server.port:服务器监听的端口。
- server.address:服务器的IP地址。
- bootstrap.servers:Kafka集群Broker地址列表,格式为ip:port,ip:port,ip:port。IPv6环境下,需要给IP地址添加 [],例如:[::1]:21007。
- security.protocol:Kafka客户端使用的认证协议,默认值“PLAINTEXT”。
- sasl.mechanism:客户端使用的认证机制,默认值“PLAIN”。
- topic:生产消费的topic名称,默认值“example-metric1”。
- isAsync:是否使用异步生产,默认值“false”。
- consumer.alive.time:消费线程存活时间,默认值“180000”,单位ms。
- is.security.mode: 客户端是否使用安全模式连接集群,默认值为“false”