更新时间:2024-05-15 GMT+08:00

使用ACL权限访问

实例开启ACL访问控制后,消息生产者和消费者都需要增加用户认证信息。

生产者增加用户认证信息

  • 普通消息、顺序消息和定时消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
    from rocketmq.client import Producer, Message
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def send_message_sync():
        producer = Producer(gid)
        producer.set_name_server_address(name_srv)
        # 设置权限(角色名和密钥)
        producer.set_session_credentials(
            "ROCKETMQ_AK",  # 角色名称
            "ROCKETMQ_SK",  # 角色密钥
            ''
        )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    
        producer.start()
        msg = create_message()
        ret = producer.send_sync(msg)
        print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()
    
    
    if __name__ == '__main__':
        send_message_sync()

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • topic:表示Topic名称。
    • gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
    • name_srv:表示实例连接地址和端口。
    • ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户
    • ROCKETMQ_SK:表示用户的密钥。
  • 事务消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
    import time
    
    from rocketmq.client import Message, TransactionMQProducer, TransactionStatus
    
    topic = 'TopicTest'
    gid = 'test'
    name_srv = '192.168.0.1:8100'
    
    
    def create_message():
        msg = Message(topic)
        msg.set_keys('XXX')
        msg.set_tags('XXX')
        msg.set_property('property', 'test')
        msg.set_body('message body')
        return msg
    
    
    def check_callback(msg):
        print('check: ' + msg.body.decode('utf-8'))
        return TransactionStatus.COMMIT
    
    
    def local_execute(msg, user_args):
        print('local:   ' + msg.body.decode('utf-8'))
        return TransactionStatus.UNKNOWN
    
    
    def send_transaction_message(count):
        producer = TransactionMQProducer(gid, check_callback)
        producer.set_name_server_address(name_srv)
        # 设置权限(角色名和密钥)
        producer.set_session_credentials(
            "ROCKETMQ_AK",  # 角色名称
            "ROCKETMQ_SK",  # 角色密钥
            ''
        )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    
        producer.start()
        for n in range(count):
            msg = create_message()
            ret = producer.send_message_in_transaction(msg, local_execute, None)
            print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        print('send transaction message done')
    
        while True:
            time.sleep(3600)
    
    
    if __name__ == '__main__':
        send_transaction_message(10)

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • topic:表示Topic名称。
    • gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
    • name_srv:表示实例连接地址和端口。
    • ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户
    • ROCKETMQ_SK:表示用户的密钥。

消费者增加用户认证信息

无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group')
    consumer.set_name_server_address('192.168.0.1:8100')
    # 设置权限(角色名和密钥)
    consumer.set_session_credentials(
        "ROCKETMQ_AK",  # 角色名称
        "ROCKETMQ_SK",  # 角色密钥
        ''
    )#用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。

    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • consumer_group:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • ROCKETMQ_AK:表示用户名。创建用户的步骤,请参见创建用户
  • ROCKETMQ_SK:表示用户的密钥。
  • TopicTest:表示Topic名称。