收发普通消息
本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送和异步发送。RocketMQ提供PushConsumer和SimpleConsumer类型的消费者,PushConsumer消费者订阅普通消息时,代码不区分同步订阅和异步订阅。SimpleConsumer消费者订阅普通消息时,代码需要区分同步订阅和异步订阅。
普通消息发送方式分为同步发送和异步发送。
- 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
- 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。
消息类型 |
发送消息 |
订阅消息(PushConsumer) |
订阅消息(SimpleConsumer) |
---|---|---|---|
普通消息 |
|
不分区同步订阅和异步订阅 |
|
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
- RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.5</version> </dependency>
同步发送
同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。
参考如下示例代码,或者通过ProducerNormalMessageExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; public class ProducerNormalMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
异步发送
异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
参考如下示例代码,或者通过AsyncProducerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class AsyncProducerExample { private static final Logger log = LoggerFactory.getLogger(AsyncProducerExample.class); private AsyncProducerExample() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") .setBody(body) .build(); final CompletableFuture<SendReceipt> future = producer.sendAsync(message); // 使用线程池去处理异步发送回调 ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("Failed to send message", throwable); return; } log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); // 阻塞主线程,不需要在生产环境中使用。 Thread.sleep(Long.MAX_VALUE); // 不再使用后,手动关闭producer。 producer.close(); } }
订阅普通消息(PushConsumer)
参考如下示例代码,或者通过PushConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); String tag = "yourMessageTagA"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String consumerGroup = "yourConsumerGroup"; String topic = "yourTopic"; // 在多数场景下,推荐使用单例模式创建consumer。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) // 设置订阅关系 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置监听器,用于处理接收到的消息,并返回消费结果。 .setMessageListener(messageView -> { log.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // 阻塞主线程,不需要在生产环境中使用。 Thread.sleep(Long.MAX_VALUE); // 不再使用后,手动关闭producer。 pushConsumer.close(); } }
同步订阅普通消息(SimpleConsumer)
参考如下示例代码,或者通过SimpleConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.message.MessageView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.List; public class SimpleConsumerExample { private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } @SuppressWarnings({"resource", "InfiniteLoopStatement"}) public static void main(String[] args) throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // 在多数场景下,推荐使用单例模式创建consumer。 SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) // 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间 .setAwaitDuration(awaitDuration) // 设置订阅关系 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // 设置每次长轮询能接收的最大消息数 int maxMessageNum = 16; // 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。 Duration invisibleDuration = Duration.ofSeconds(15); // 接收消息,推荐使用多线程的方式。 while (true) { // 如果存在可用的消息会立即返回,否则在等待超时后,返回空。 final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); log.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { // 处理接收的消息,消费成功后提交消息。 consumer.ack(message); log.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } } // 不再使用后,手动关闭consumer。 // consumer.close(); } }
异步订阅普通消息(SimpleConsumer)
参考如下示例代码,或者通过AsyncSimpleConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.message.MessageView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.stream.Collectors; public class AsyncSimpleConsumerExample { private static final Logger log = LoggerFactory.getLogger(AsyncSimpleConsumerExample.class); private AsyncSimpleConsumerExample() { } @SuppressWarnings({"resource", "InfiniteLoopStatement"}) public static void main(String[] args) throws ClientException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // 在多数场景下,推荐使用单例模式创建consumer。 SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) // 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间 .setAwaitDuration(awaitDuration) // 设置订阅关系 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // 设置每次长轮询能接收的最大消息数 int maxMessageNum = 16; // 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。 Duration invisibleDuration = Duration.ofSeconds(15); // 设置允许的最大长轮询请求数 int maxLongPollingSize = 32; Semaphore semaphore = new Semaphore(maxLongPollingSize); // 使用线程池处理接收消息回调 ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool(); // 使用线程池处理提交消息回调 ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool(); // 接收消息 while (true) { semaphore.acquire(); // 异步提交消息,如果存在可用的消息会立即返回触发回调,否则在等待超时后,返回空。 final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration); future0.whenCompleteAsync(((messages, throwable) -> { // 处理接收到的消息 semaphore.release(); if (null != throwable) { log.error("Failed to receive message from remote", throwable); return; } log.info("Received {} message(s)", messages.size()); // 异步提交消息,并使用messageView作为键,因为message id可能重复。 final Map<MessageView, CompletableFuture<Void>> map = messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync)); for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) { final MessageId messageId = entry.getKey().getMessageId(); final CompletableFuture<Void> future = entry.getValue(); future.whenCompleteAsync((v, t) -> { // 处理提交消息回调 if (null != t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); return; } log.info("Message is acknowledged successfully, messageId={}", messageId); }, ackCallbackExecutor); } }), receiveCallbackExecutor); } // 不再使用后,手动关闭consumer。 // consumer.close(); } }