更新时间:2024-03-05 GMT+08:00
收发事务消息
分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
发送事务消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。
事务消息生产者需要实现两个回调函数,其中local_execute回调函数在发送完半事务消息后被调用,即上图中的第3阶段,check_callback回调函数在收到回查时调用,即上图中的第6阶段。两个回调函数均可返回3种事务状态:
- TransactionStatus.COMMIT:提交事务,允许消费者消费该消息。
- TransactionStatus.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.UNKNOWN:无法判断状态,期待服务端向生产者再次回查该消息的状态。
订阅事务消息
订阅事务消息的代码与订阅普通消息的代码相同。
父主题: Python