更新时间:2025-04-22 GMT+08:00
RocketMQ-Spring的使用
本文介绍如何使用RocketMQ-Spring连接华为云RocketMQ实例进行消息的生产和消费。相关代码您可以从rocketmq-springboot-demo中获取。
下文所有RocketMQ的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
约束与限制
RocketMQ-Spring客户端2.2.2以下版本不支持开启SSL,推荐使用2.3.0及以上版本。
在pom.xml文件中引入RocketMQ-Spring依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
在application.properties文件中填写配置
#=============== 生产者配置 ======================= ## 替换成真实RocketMQ的NameServer地址与端口 rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group ## 是否开启SSL rocketmq.producer.tls-enable=true #=============== 消费者配置 ======================= ## 替换成真实RocketMQ的NameServer地址与端口 rocketmq.name-server=127.0.0.1:9876
生产消息
生产消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication
public class ProduceDemoApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ProduceDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// send message synchronously
rocketMQTemplate.convertAndSend("topic", "Hello, World!");
// send spring message
rocketMQTemplate.send(
"topic", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
// send messgae asynchronously
rocketMQTemplate.asyncSend(
"topic",
MessageBuilder.withPayload("Hello, World! I'm from spring message").build(),
new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
// Send messages orderly
rocketMQTemplate.syncSendOrderly(
"topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey");
}
}
消费消息
消费消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication
public class ConsumeDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumeDemoApplication.class, args);
}
@Service
@RocketMQMessageListener(topic = "topic", consumerGroup = "group", tlsEnable = "true")
public static class MyConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.printf("received message: %s", message);
}
}
}
父主题: Java(TCP协议)