更新时间:2024-12-19 GMT+08:00

RocketMQ-Spring的使用

本文介绍如何使用RocketMQ-Spring连接华为云RocketMQ实例进行消息的生产和消费。相关代码您可以从rocketmq-springboot-demo中获取。

下文所有RocketMQ的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。

在pom.xml文件中引入RocketMQ-Spring依赖

<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>${RELEASE.VERSION}</version>
</dependency>

在application.properties文件中填写配置

#=============== 生产者配置 =======================
## 替换成真实RocketMQ的NameServer地址与端口
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
## 是否开启SSL
rocketmq.producer.tls-enable=true
#=============== 消费者配置 =======================
## 替换成真实RocketMQ的NameServer地址与端口
rocketmq.name-server=127.0.0.1:9876

生产消息

生产消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication
public class ProduceDemoApplication implements CommandLineRunner {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ProduceDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // send message synchronously
        rocketMQTemplate.convertAndSend("topic", "Hello, World!");
        // send spring message
        rocketMQTemplate.send(
                "topic", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        // send messgae asynchronously
        rocketMQTemplate.asyncSend(
                "topic",
                MessageBuilder.withPayload("Hello, World! I'm from spring message").build(),
                new SendCallback() {
                    @Override
                    public void onSuccess(SendResult var1) {
                        System.out.printf("async onSucess SendResult=%s %n", var1);
                    }

                    @Override
                    public void onException(Throwable var1) {
                        System.out.printf("async onException Throwable=%s %n", var1);
                    }
                });
        // Send messages orderly
        rocketMQTemplate.syncSendOrderly(
                "topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey");
    }
}

消费消息

消费消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
@SpringBootApplication
public class ConsumeDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumeDemoApplication.class, args);
    }

    @Service
    @RocketMQMessageListener(topic = "topic", consumerGroup = "group", tlsEnable = "true")
    public static class MyConsumer implements RocketMQListener<String> {
        public void onMessage(String message) {
            System.out.printf("received message: %s", message);
        }
    }
}