Updated on 2024-12-25 GMT+08:00

Sending and Receiving Normal Messages

This section describes how to send and receive normal messages and provides sample code. Normal messages can be sent in the synchronous or asynchronous mode.

  • Synchronous transmission: After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.
  • Asynchronous transmission: After sending a message, the sender sends the next message without waiting for a response from the server.

The following examples describe only the sample code of synchronous transmission.

Before sending and receiving normal messages, collect RocketMQ connection information by referring to Collecting Connection Information.

Notes and Constraints

To receive and send normal messages, ensure the topic message type is Normal before connecting a client to a RocketMQ instance of v5.x.

Preparing the Environment

  1. Run the python command to check whether Python has been installed. If the following information is displayed, Python has been installed:
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    If Python is not installed, run the following command:

    yum install python
  2. Install the librocketmq library and rocketmq-client-python. For details, see rocketmq-client-python.

    Download rocketmq-client-cpp-2.2.0 to obtain the librocketmq library.

  3. Add librocketmq.so to the system's dynamic library search path.
    1. Find the path of librocketmq.so.
      find / -name librocketmq.so
    2. Add librocketmq.so to the system's dynamic library search path.
      ln -s /librocketmq.so_path/librocketmq.so /usr/lib
      sudo ldconfig

Synchronous Transmission

After sending a message, the sender waits for the server to receive and process the message, and does not send the next message until it receives a response from the server.

The following code is an example. Replace the information in bold with the actual values.

from rocketmq.client import Producer, Message

topic = 'TopicTest'
gid = 'test'
name_srv = '192.168.0.1:8100'


def create_message():
    msg = Message(topic)
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_property('property', 'test')
    msg.set_body('message body')
    return msg


def send_message_sync():
    producer = Producer(gid)
    producer.set_name_server_address(name_srv)
    producer.start()
    msg = create_message()
    ret = producer.send_sync(msg)
    print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()


if __name__ == '__main__':
    send_message_sync()

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • topic: topic name.
  • gid: producer group name. Enter a name as required.
  • name_srv: instance address and port.

Subscribing to Normal Messages

The following code is an example. Replace the information in bold with the actual values.

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group')
    consumer.set_name_server_address('192.168.0.1:8100')
    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • consumer_group: consumer group name.
  • 192.168.0.1:8100: instance address and port.
  • TopicTest: topic name.