Controlling Access with ACL
After ACL is enabled for an instance, user authentication information must be added to both the producer and consumer configurations.
Adding User Authentication Information to the Producer
- For normal, ordered, and scheduled messages, add the following code. Replace the information in bold with the actual values.
from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) # Set the permission (role name and key). producer.set_session_credentials( "ROCKETMQ_AK", # Role name "ROCKETMQ_SK", # Role key '' )#Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync()
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- topic: topic name.
- gid: producer group name. Enter a name as required.
- name_srv: instance address and port.
- ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
- ROCKETMQ_SK: user's key.
- For transactional messages, add the following code. Replace the information in bold with the actual values.
import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) # Set the permission (role name and key). producer.set_session_credentials( "ROCKETMQ_AK", # Role name "ROCKETMQ_SK", # Role key '' )#Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- topic: topic name.
- gid: producer group name. Enter a name as required.
- name_srv: instance address and port.
- ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
- ROCKETMQ_SK: user's key.
Adding User Authentication Information to the Consumer
Add the following code for normal, ordered, scheduled, and transactional messages. Replace the information in bold with the actual values.
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') # Set the permission (role name and key). consumer.set_session_credentials( "ROCKETMQ_AK", # Role name "ROCKETMQ_SK", # Role key '' )#Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message()
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- consumer_group: consumer group name.
- 192.168.0.1:8100: instance address and port.
- ROCKETMQ_AK: username. For details about how to create a user, see Creating a User.
- ROCKETMQ_SK: user's key.
- TopicTest: topic name.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.