Updated on 2024-05-15 GMT+08:00

Sending and Receiving Normal Messages

This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode. RocketMQ provides the PushConsumer and SimpleConsumer consumer types. PushConsumer consumer subscriptions do not differentiate between synchronous and asynchronous for normal messages. SimpleConsumer consumers subscribe to normal messages in synchronous or asynchronous mode.

Normal messages can be sent in the synchronous or asynchronous mode.

  • Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
  • Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.
Table 1 Sending and receiving modes for normal messages

Message Type

Sending a Message

Subscription by PushConsumer

Subscription by SimpleConsumer

Normal

  • Synchronous
  • Asynchronous

No differentiation between synchronous and asynchronous

  • Synchronous
  • Asynchronous

Before sending and receiving messages, collect RocketMQ connection information by referring to Collecting Connection Information.

To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of version 5.x.

Preparing the Environment

You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 5.0.5.

Using Maven
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

Synchronous Transmission

After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.

Refer to the following sample code or obtain more sample code from ProducerNormalMessageExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ProducerNormalMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourNormalTopics";
        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();

        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // Close the producer when it is no longer needed.
        producer.close();
    }
}

Asynchronous Transmission

After sending a message, the sender sends the next message without waiting for a response from the server.

Refer to the following sample code or obtain more sample code from AsyncProducerExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncProducerExample {
    private static final Logger log = LoggerFactory.getLogger(AsyncProducerExample.class);

    private AsyncProducerExample() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourNormalTopics";
        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                .setBody(body)
                .build();

        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
        // Use the thread pool to execute asynchronous send callback.
        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
        future.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);
        // Disable the main thread in the production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the producer when it is no longer needed.
        producer.close();
    }
}

Subscribing to Normal Messages (PushConsumer)

Refer to the following sample code or obtain more sample code from PushConsumerExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        String tag = "yourMessageTagA";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "yourConsumerGroup";
        String topic = "yourTopic";
        // In most cases, you do not need to create too many consumers. The singleton pattern is recommended.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // Set the subscription relationship.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Set the message listener to process messages and return the consumption result.
                .setMessageListener(messageView -> {
                    log.info("Consume message={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // Disable the main thread in the production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the producer when it is no longer needed.
        pushConsumer.close();
    }
}

Synchronously Subscribing to Normal Messages (SimpleConsumer)

Refer to the following sample code or obtain more sample code from SimpleConsumerExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // In most cases, you do not need to create too many consumers. The singleton pattern is recommended.
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // Set the maximum await duration for receiving requests in long polling.
                .setAwaitDuration(awaitDuration)
                // Set the subscription relationship.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Set the maximum number of messages in for each long polling.
        int maxMessageNum = 16;
        // Set the duration when received messages are invisible to other consumers.
        Duration invisibleDuration = Duration.ofSeconds(15);
        // Use multiple threads to receive messages.
        while (true) {
            // Return any consumable messages or null after the await duration expires.
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // Process the received messages and acknowledge successful consumption.
                    consumer.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // Close the consumer when it is no longer needed.
        // consumer.close();
    }
}

Asynchronously Subscribing to Normal Messages (SimpleConsumer)

Refer to the following sample code or obtain more sample code from AsyncSimpleConsumerExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;

public class AsyncSimpleConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);

    private AsyncSimpleConsumerExample() {
    }

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // In most cases, you do not need to create too many consumers. The singleton pattern is recommended.
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                // Set the maximum await duration for receiving requests in long polling.
                .setAwaitDuration(awaitDuration)
                // Set the subscription relationship.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Set the maximum number of messages in for each long polling.
        int maxMessageNum = 16;
        // Set the duration when received messages are invisible to other consumers.
        Duration invisibleDuration = Duration.ofSeconds(15);
        // Set max number of long-polling requests.
        int maxLongPollingSize = 32;
        Semaphore semaphore = new Semaphore(maxLongPollingSize);
        // Use thread pool to execute receive callback.
        ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
        // Use thread pool to execute acknowledge callback.
        ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
        // Receiving
        while (true) {
            semaphore.acquire();
            // Async throw messages. Return null or any callback if there is any consumable message in await duration.
            final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum,
                    invisibleDuration);
            future0.whenCompleteAsync(((messages, throwable) -> {
                // Process received messages.
                semaphore.release();
                if (null != throwable) {
                    log.error("Failed to receive message from remote", throwable);
                    return;
                }
                log.info("Received {} message(s)", messages.size());
                // Async throw messages with messageView as key (message ID may not be unique).
                final Map<MessageView, CompletableFuture<Void>> map =
                        messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
                for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
                    final MessageId messageId = entry.getKey().getMessageId();
                    final CompletableFuture<Void> future = entry.getValue();
                    future.whenCompleteAsync((v, t) -> {
                        // Process callback.
                        if (null != t) {
                            log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                            return;
                        }
                        log.info("Message is acknowledged successfully, messageId={}", messageId);
                    }, ackCallbackExecutor);
                }

            }), receiveCallbackExecutor);
        }
        // Close the consumer when it is no longer needed.
        // consumer.close();
    }
}