Updated on 2023-03-22 GMT+08:00

Controlling Access with ACL

After ACL is enabled for an instance, user authentication information must be added to both the producer and consumer configurations.

Adding User Authentication Information to the Producer

  • For normal, ordered, and scheduled messages, add the following code. Replace the information in bold with the actual values.
    from rocketmq.client import Producer, Message
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def send_message_sync():
        producer = Producer(gid)
        producer.set_name_server_address(name_srv)
        producer.set_session_credentials("access_key", "secret_key", "")
        producer.start()
        msg = create_message()
        ret = producer.send_sync(msg)
        print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()
    
    
    if __name__ == '__main__':
        send_message_sync()

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • topic: topic name.
    • gid: consumer group name.
    • name_srv: instance metadata address and port.
    • access_key: username. For details about how to create a user, see Creating a User.
    • secret_key: secret key of the user.
  • For transactional messages, add the following code. Replace the information in bold with the actual values.
    import time
    
    from rocketmq.client import Message, TransactionMQProducer, TransactionStatus
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def check_callback(msg):
        print('check: ' + msg.body.decode('utf-8'))
        return TransactionStatus.COMMIT
    
    
    def local_execute(msg, user_args):
        print('local:   ' + msg.body.decode('utf-8'))
        return TransactionStatus.UNKNOWN
    
    
    def send_transaction_message(count):
        producer = TransactionMQProducer(gid, check_callback)
        producer.set_name_server_address(name_srv)
        producer.set_session_credentials("access_key", "secret_key", "")
        producer.start()
        for n in range(count):
            msg = create_message()
            ret = producer.send_message_in_transaction(msg, local_execute, None)
            print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        print('send transaction message done')
    
        while True:
            time.sleep(3600)
    
    
    if __name__ == '__main__':
        send_transaction_message(10)

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • topic: topic name.
    • gid: consumer group name.
    • name_srv: instance metadata address and port.
    • access_key: username. For details about how to create a user, see Creating a User.
    • secret_key: secret key of the user.

Adding User Authentication Information to the Consumer

Add the following code for normal, ordered, scheduled, and transactional messages. Replace the information in bold with the actual values.

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group')
    consumer.set_name_server_address('192.168.0.1:8100')
    consumer.set_session_credentials("access_key", "secret_key", "")
    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • consumer_group: consumer group name.
  • 192.168.0.1:8100: instance metadata address and port.
  • access_key: username. For details about how to create a user, see Creating a User.
  • secret_key: secret key of the user.
  • TopicTest: topic name.