更新时间:2024-05-15 GMT+08:00

收发事务消息

分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。

图1 事务消息交互流程

事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。

收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

客户端连接RocketMQ实例5.x版本收发事务消息前,需要确保Topic的消息类型为“事务”。

发送事务消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

import time

from rocketmq.client import Message, TransactionMQProducer, TransactionStatus

topic = 'TopicTest'
gid = 'test'
name_srv = '192.168.0.1:8100'


def create_message():
    msg = Message(topic)
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_property('property', 'test')
    msg.set_body('message body')
    return msg


def check_callback(msg):
    print('check: ' + msg.body.decode('utf-8'))
    return TransactionStatus.COMMIT


def local_execute(msg, user_args):
    print('local:   ' + msg.body.decode('utf-8'))
    return TransactionStatus.UNKNOWN


def send_transaction_message(count):
    producer = TransactionMQProducer(gid, check_callback)
    producer.set_name_server_address(name_srv)
    producer.start()
    for n in range(count):
        msg = create_message()
        ret = producer.send_message_in_transaction(msg, local_execute, None)
        print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    print('send transaction message done')

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    send_transaction_message(10)

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • topic:表示Topic名称。
  • gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
  • name_srv:表示实例连接地址和端口。

事务消息生产者需要实现两个回调函数,其中local_execute回调函数在发送完半事务消息后被调用,即上图中的第3阶段,check_callback回调函数在收到回查时调用,即上图中的第6阶段。两个回调函数均可返回3种事务状态:

  • TransactionStatus.COMMIT:提交事务,允许消费者消费该消息。
  • TransactionStatus.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
  • TransactionStatus.UNKNOWN:无法判断状态,期待服务端向生产者再次回查该消息的状态。

订阅事务消息

订阅事务消息的代码与订阅普通消息的代码相同。