更新时间:2024-12-10 GMT+08:00
分享

收发普通消息

本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送和异步发送。RocketMQ提供PushConsumer和SimpleConsumer类型的消费者,PushConsumer消费者订阅普通消息时,代码不区分同步订阅和异步订阅。SimpleConsumer消费者订阅普通消息时,代码需要区分同步订阅和异步订阅。

普通消息发送方式分为同步发送和异步发送。

  • 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
  • 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。
表1 普通消息收发方式

消息类型

发送消息

订阅消息(PushConsumer)

订阅消息(SimpleConsumer)

普通消息

  • 同步发送
  • 异步发送

不分区同步订阅和异步订阅

  • 同步订阅
  • 异步订阅

收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

约束与限制

  • 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
  • 客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5

使用Maven方式引入依赖。
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

同步发送

同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。

参考如下示例代码,或者通过ProducerNormalMessageExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ProducerNormalMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourNormalTopics";
        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();

        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // 不再使用后,手动关闭producer。
        producer.close();
    }
}

异步发送

异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

参考如下示例代码,或者通过AsyncProducerExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncProducerExample {
    private static final Logger log = LoggerFactory.getLogger(AsyncProducerExample.class);

    private AsyncProducerExample() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourNormalTopics";
        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                .setBody(body)
                .build();

        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
        // 使用线程池去处理异步发送回调
        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
        future.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);
        // 阻塞主线程,不需要在生产环境中使用。
        Thread.sleep(Long.MAX_VALUE);
        // 不再使用后,手动关闭producer。
        producer.close();
    }
}

订阅普通消息(PushConsumer)

参考如下示例代码,或者通过PushConsumerExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();
        String tag = "yourMessageTagA";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "yourConsumerGroup";
        String topic = "yourTopic";
        // 在多数场景下,推荐使用单例模式创建consumer。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // 设置订阅关系
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置监听器,用于处理接收到的消息,并返回消费结果。
                .setMessageListener(messageView -> {
                    log.info("Consume message={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 阻塞主线程,不需要在生产环境中使用。
        Thread.sleep(Long.MAX_VALUE);
        // 不再使用后,手动关闭producer。
        pushConsumer.close();
    }
}

同步订阅普通消息(SimpleConsumer)

参考如下示例代码,或者通过SimpleConsumerExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 在多数场景下,推荐使用单例模式创建consumer。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间
                .setAwaitDuration(awaitDuration)
                // 设置订阅关系
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // 设置每次长轮询能接收的最大消息数
        int maxMessageNum = 16;
        // 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
        Duration invisibleDuration = Duration.ofSeconds(15);
        // 接收消息,推荐使用多线程的方式。
        while (true) {
            // 如果存在可用的消息会立即返回,否则在等待超时后,返回空。
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // 处理接收的消息,消费成功后提交消息。
                    consumer.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // 不再使用后,手动关闭consumer。
        // consumer.close();
    }
}

异步订阅普通消息(SimpleConsumer)

参考如下示例代码,或者通过AsyncSimpleConsumerExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;

public class AsyncSimpleConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);

    private AsyncSimpleConsumerExample() {
    }

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 在多数场景下,推荐使用单例模式创建consumer。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间
                .setAwaitDuration(awaitDuration)
                // 设置订阅关系
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // 设置每次长轮询能接收的最大消息数
        int maxMessageNum = 16;
        // 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
        Duration invisibleDuration = Duration.ofSeconds(15);
        // 设置允许的最大长轮询请求数
        int maxLongPollingSize = 32;
        Semaphore semaphore = new Semaphore(maxLongPollingSize);
        // 使用线程池处理接收消息回调
        ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
        // 使用线程池处理提交消息回调
        ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
        // 接收消息
        while (true) {
            semaphore.acquire();
            // 异步提交消息,如果存在可用的消息会立即返回触发回调,否则在等待超时后,返回空。
            final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum,
                    invisibleDuration);
            future0.whenCompleteAsync(((messages, throwable) -> {
                // 处理接收到的消息
                semaphore.release();
                if (null != throwable) {
                    log.error("Failed to receive message from remote", throwable);
                    return;
                }
                log.info("Received {} message(s)", messages.size());
                // 异步提交消息,并使用messageView作为键,因为message id可能重复。
                final Map<MessageView, CompletableFuture<Void>> map =
                        messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
                for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
                    final MessageId messageId = entry.getKey().getMessageId();
                    final CompletableFuture<Void> future = entry.getValue();
                    future.whenCompleteAsync((v, t) -> {
                        // 处理提交消息回调
                        if (null != t) {
                            log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                            return;
                        }
                        log.info("Message is acknowledged successfully, messageId={}", messageId);
                    }, ackCallbackExecutor);
                }

            }), receiveCallbackExecutor);
        }
        // 不再使用后,手动关闭consumer。
        // consumer.close();
    }
}

相关文档