Updated on 2024-05-15 GMT+08:00

Sending and Receiving Normal Messages

This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.

  • Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
  • Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.

Before sending and receiving normal messages, collect RocketMQ connection information by referring to Collecting Connection Information.

To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of version 5.x.

Preparing the Environment

You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 4.9.8.

Use either of the following methods to import a dependency:
  • Using Maven
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.8</version>
    </dependency>
  • Downloading the dependency.

Synchronous Transmission

After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.

Refer to the following sample code or obtain more sample code from Producer.java.

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Main {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // Enter the address.
        producer.setNamesrvAddr("192.168.0.1:8100");
        //producer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation.
        try {
            producer.start();
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);

        } catch (Exception e) {
            e.printStackTrace();

        }
        producer.shutdown();
    }
}

Asynchronous Transmission

After sending a message, the sender sends the next message without waiting for a response from the server.

Asynchronous transmission requires the SendCallback method to be supported on the client. After sending a message, the sender sends the next message without waiting for a server response. The sender calls the SendCallback method to receive the server's response and then processes the response.

Refer to the following sample code or obtain more sample code from AsyncProducer.java.

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class Main {
    public static void main(String[] args) throws InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // Enter the address.
        producer.setNamesrvAddr("192.168.120.45:8100;192.168.123.150:8100");
        //producer.setUseTLS(true);    // Add this line if SSL has been enabled during instance creation.
        try {
            producer.start();
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult result) {
                    // Message sent.
                    System.out.println("send message success. msgId= " + result.getMsgId());
                }


                @Override
                public void onException(Throwable throwable) {
                    // If the message fails to be sent, you can resend the message or persist the data for compensation.
                    System.out.println("send message failed.");
                    throwable.printStackTrace();
                }
            });


        } catch (Exception e) {
            e.printStackTrace();
        }
        Thread.sleep(2000);
        producer.shutdown();
    }}

Subscribing to Normal Messages

Refer to the following sample code or obtain more sample code from PushConsumer.java.

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // Enter the address.
        consumer.setNamesrvAddr("192.168.0.1:8100");
        //consumer.setUseTLS(true);    // Add this line if SSL has been enabled during instance creation.
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}