Sending and Receiving Normal Messages
This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.
- Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
- Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.
Before sending and receiving normal messages, collect RocketMQ connection information by referring to Collecting Connection Information.
To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of version 5.x.
Preparing the Environment
You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 4.9.8.
- Using Maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.8</version> </dependency>
- Downloading the dependency.
Synchronous Transmission
After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
Refer to the following sample code or obtain more sample code from Producer.java.
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Main { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // Enter the address. producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation. try { producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
Asynchronous Transmission
After sending a message, the sender sends the next message without waiting for a response from the server.
Asynchronous transmission requires the SendCallback method to be supported on the client. After sending a message, the sender sends the next message without waiting for a server response. The sender calls the SendCallback method to receive the server's response and then processes the response.
Refer to the following sample code or obtain more sample code from AsyncProducer.java.
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Main { public static void main(String[] args) throws InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // Enter the address. producer.setNamesrvAddr("192.168.120.45:8100;192.168.123.150:8100"); //producer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation. try { producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // Message sent. System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // If the message fails to be sent, you can resend the message or persist the data for compensation. System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } Thread.sleep(2000); producer.shutdown(); }}
Subscribing to Normal Messages
Refer to the following sample code or obtain more sample code from PushConsumer.java.
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Enter the address. consumer.setNamesrvAddr("192.168.0.1:8100"); //consumer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation. consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot