Updated on 2024-03-05 GMT+08:00

Controlling Access with ACL

After ACL is enabled for an instance, user authentication information must be added to both the producer and consumer configurations.

Adding User Authentication Information to the Producer

  • For normal, ordered, and scheduled messages, add the following code. Replace the information in bold with the actual values.
    import os
    from rocketmq.client import Producer, Message
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def send_message_sync():
        producer = Producer(gid)
        producer.set_name_server_address(name_srv)
        producer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
        producer.start()
        msg = create_message()
        ret = producer.send_sync(msg)
        print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()
    
    
    if __name__ == '__main__':
        send_message_sync()

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • topic: topic name.
    • gid: producer group name. Enter a name as required.
    • name_srv: instance address and port.
    • ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
    • ROCKETMQ_SK: user's key.
  • For transactional messages, add the following code. Replace the information in bold with the actual values.
    import os
    import time
    
    from rocketmq.client import Message, TransactionMQProducer, TransactionStatus
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def check_callback(msg):
        print('check: ' + msg.body.decode('utf-8'))
        return TransactionStatus.COMMIT
    
    
    def local_execute(msg, user_args):
        print('local:   ' + msg.body.decode('utf-8'))
        return TransactionStatus.UNKNOWN
    
    
    def send_transaction_message(count):
        producer = TransactionMQProducer(gid, check_callback)
        producer.set_name_server_address(name_srv)
        producer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
        producer.start()
        for n in range(count):
            msg = create_message()
            ret = producer.send_message_in_transaction(msg, local_execute, None)
            print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        print('send transaction message done')
    
        while True:
            time.sleep(3600)
    
    
    if __name__ == '__main__':
        send_transaction_message(10)

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • topic: topic name.
    • gid: producer group name. Enter a name as required.
    • name_srv: instance address and port.
    • ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
    • ROCKETMQ_SK: user's key.

Adding User Authentication Information to the Consumer

Add the following code for normal, ordered, scheduled, and transactional messages. Replace the information in bold with the actual values.

import os
import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group')
    consumer.set_name_server_address('192.168.0.1:8100')
    consumer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • consumer_group: consumer group name.
  • 192.168.0.1:8100: instance address and port.
  • ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
  • ROCKETMQ_SK: user's key.
  • TopicTest: topic name.