收发普通消息
本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送和异步发送。RocketMQ提供PushConsumer和SimpleConsumer类型的消费者,PushConsumer消费者订阅普通消息时,代码不区分同步订阅和异步订阅。SimpleConsumer消费者订阅普通消息时,代码需要区分同步订阅和异步订阅。
普通消息发送方式分为同步发送和异步发送。
- 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
- 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。
|
消息类型 |
发送消息 |
订阅消息(PushConsumer) |
订阅消息(SimpleConsumer) |
|---|---|---|---|
|
普通消息 |
|
不分区同步订阅和异步订阅 |
|
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
同步发送
同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。
参考如下示例代码,或者通过ProducerNormalMessageExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ProducerNormalMessageExample {
private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class);
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "yourNormalTopics";
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 不再使用后,手动关闭producer。
producer.close();
}
}
异步发送
异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
参考如下示例代码,或者通过AsyncProducerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncProducerExample {
private static final Logger log = LoggerFactory.getLogger(AsyncProducerExample.class);
private AsyncProducerExample() {
}
public static void main(String[] args) throws ClientException, InterruptedException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "yourNormalTopics";
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey")
.setBody(body)
.build();
final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
// 使用线程池去处理异步发送回调
ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
future.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
log.error("Failed to send message", throwable);
return;
}
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
}, sendCallbackExecutor);
// 阻塞主线程,不需要在生产环境中使用。
Thread.sleep(Long.MAX_VALUE);
// 不再使用后,手动关闭producer。
producer.close();
}
}
订阅普通消息(PushConsumer)
参考如下示例代码,或者通过PushConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, InterruptedException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
String tag = "yourMessageTagA";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
// 在多数场景下,推荐使用单例模式创建consumer。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
// 设置订阅关系
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置监听器,用于处理接收到的消息,并返回消费结果。
.setMessageListener(messageView -> {
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
// 阻塞主线程,不需要在生产环境中使用。
Thread.sleep(Long.MAX_VALUE);
// 不再使用后,手动关闭producer。
pushConsumer.close();
}
}
同步订阅普通消息(SimpleConsumer)
参考如下示例代码,或者通过SimpleConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
String consumerGroup = "yourConsumerGroup";
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "yourMessageTagA";
String topic = "yourTopic";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 在多数场景下,推荐使用单例模式创建consumer。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
// 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间
.setAwaitDuration(awaitDuration)
// 设置订阅关系
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// 设置每次长轮询能接收的最大消息数
int maxMessageNum = 16;
// 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
Duration invisibleDuration = Duration.ofSeconds(15);
// 接收消息,推荐使用多线程的方式。
while (true) {
// 如果存在可用的消息会立即返回,否则在等待超时后,返回空。
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// 处理接收的消息,消费成功后提交消息。
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 不再使用后,手动关闭consumer。
// consumer.close();
}
}
异步订阅普通消息(SimpleConsumer)
参考如下示例代码,或者通过AsyncSimpleConsumerExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
public class AsyncSimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);
private AsyncSimpleConsumerExample() {
}
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
String consumerGroup = "yourConsumerGroup";
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "yourMessageTagA";
String topic = "yourTopic";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 在多数场景下,推荐使用单例模式创建consumer。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
// 设置长轮询接收消息请求(long-polling receive requests)的最大等待时间
.setAwaitDuration(awaitDuration)
// 设置订阅关系
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// 设置每次长轮询能接收的最大消息数
int maxMessageNum = 16;
// 设置消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
Duration invisibleDuration = Duration.ofSeconds(15);
// 设置允许的最大长轮询请求数
int maxLongPollingSize = 32;
Semaphore semaphore = new Semaphore(maxLongPollingSize);
// 使用线程池处理接收消息回调
ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
// 使用线程池处理提交消息回调
ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
// 接收消息
while (true) {
semaphore.acquire();
// 异步提交消息,如果存在可用的消息会立即返回触发回调,否则在等待超时后,返回空。
final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum,
invisibleDuration);
future0.whenCompleteAsync(((messages, throwable) -> {
// 处理接收到的消息
semaphore.release();
if (null != throwable) {
log.error("Failed to receive message from remote", throwable);
return;
}
log.info("Received {} message(s)", messages.size());
// 异步提交消息,并使用messageView作为键,因为message id可能重复。
final Map<MessageView, CompletableFuture<Void>> map =
messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
future.whenCompleteAsync((v, t) -> {
// 处理提交消息回调
if (null != t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
return;
}
log.info("Message is acknowledged successfully, messageId={}", messageId);
}, ackCallbackExecutor);
}
}), receiveCallbackExecutor);
}
// 不再使用后,手动关闭consumer。
// consumer.close();
}
}