更新时间:2024-12-19 GMT+08:00
RocketMQ-Spring的使用
本文介绍如何使用RocketMQ-Spring连接华为云RocketMQ实例进行消息的生产和消费。相关代码您可以从rocketmq-springboot-demo中获取。
下文所有RocketMQ的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
在pom.xml文件中引入RocketMQ-Spring依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${RELEASE.VERSION}</version> </dependency>
在application.properties文件中填写配置
#=============== 生产者配置 ======================= ## 替换成真实RocketMQ的NameServer地址与端口 rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group ## 是否开启SSL rocketmq.producer.tls-enable=true #=============== 消费者配置 ======================= ## 替换成真实RocketMQ的NameServer地址与端口 rocketmq.name-server=127.0.0.1:9876
生产消息
生产消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication public class ProduceDemoApplication implements CommandLineRunner { @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(ProduceDemoApplication.class, args); } @Override public void run(String... args) throws Exception { // send message synchronously rocketMQTemplate.convertAndSend("topic", "Hello, World!"); // send spring message rocketMQTemplate.send( "topic", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); // send messgae asynchronously rocketMQTemplate.asyncSend( "topic", MessageBuilder.withPayload("Hello, World! I'm from spring message").build(), new SendCallback() { @Override public void onSuccess(SendResult var1) { System.out.printf("async onSucess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { System.out.printf("async onException Throwable=%s %n", var1); } }); // Send messages orderly rocketMQTemplate.syncSendOrderly( "topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey"); } }
消费消息
消费消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication public class ConsumeDemoApplication { public static void main(String[] args) { SpringApplication.run(ConsumeDemoApplication.class, args); } @Service @RocketMQMessageListener(topic = "topic", consumerGroup = "group", tlsEnable = "true") public static class MyConsumer implements RocketMQListener<String> { public void onMessage(String message) { System.out.printf("received message: %s", message); } } }
父主题: Java(TCP协议)