更新时间:2024-03-05 GMT+08:00
使用ACL权限访问
实例开启ACL访问控制后,消息生产者和消费者都需要增加用户认证信息。
生产者增加用户认证信息
- 普通消息、顺序消息和定时消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
import os from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。
- ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。
- ROCKETMQ_SK:表示用户的密钥。
- 事务消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
import os import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) producer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。
- ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。
- ROCKETMQ_SK:表示用户的密钥。
消费者增加用户认证信息
无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
import os import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') consumer.set_session_credentials(os.getenv("ROCKETMQ_AK"), os.getenv("ROCKETMQ_SK"), "") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- consumer_group:表示消费组名称。
- 192.168.0.1:8100:表示实例连接地址和端口。
- ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户。
- ROCKETMQ_SK:表示用户的密钥。
- TopicTest:表示Topic名称。
父主题: Python