应用管理与运维平台 ServiceStage应用管理与运维平台 ServiceStage

更新时间:2021/09/30 GMT+08:00
分享

ServiceComb接入DTM样例指南

本章节介绍ServiceComb 框架下MQ消息接入DTM事务的demo,使得Spring Cloud框架下的MQ消息项目可以快速接入DTM。

其中,ServiceComb的样例代码在导入样例工程过程中准备的dtm-demo的dtm-servicecomb项目中。

前提条件

  1. 创建微服务引擎,请参考创建微服务引擎专享版
  2. (可选)获取AK/SK与项目名称,请参考AK/SK获取方法
    • 使用微服务引擎专业版,需要配置AK/SK。
    • 使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK。

      DTM不支持开启了安全认证的微服务引擎专享版。

  3. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以便样例中的服务可以注册到RocketMQ上。

样例设计

ServiceComb样例中使用ServiceComb框架,样例流程参考图1

  • 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
  • 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一致性。

mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现机制,会有重复消费消息的问题。

业务流程分析

  • 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服务转出100。
  • 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入的200回滚。

DTM全局事务发起者

定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,并且投递半消息。

// com.huawei.dtm.mq.server.controller.MqController.java
@GetMapping(value = "transfer")
@DTMTxBegin(appName = "noninvasive-transfer-mq-ServiceComb")
public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,
    @RequestParam(value = "errRate") int errRate) throws Exception {
    LOGGER.info("mq service start invoke bankA and bankB");
    bankOperator.transfer(id, money, errRate);
    return "ok";
}
// bankOperator 有两个实现类,下面以 restTemplate 的实现类为例
// com.huawei.dtm.mq.server.impl.RestOpImpl.java
@Override
public String transfer(int userId, int money, int errRate) throws Exception {
    LOGGER.info("Start transfer---rest");
    restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);
    mqTemplate.sendMsg(userId, money);
    // BankB在转账的时候会概率性抛出异常
    restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);
    return "ok";
}

mqService服务

  • 使用DTM封装好的事务生产者DtmRocketMqProducer。
    // com.huawei.dtm.mq.server.config.WebConfig.java;
    @Bean
    public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
        DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
        // 设置自己的MQ地址
        producer.setNamesrvAddr("http://127.0.0.1:9876");
        return producer;
    }
    
  • 发送接入到全局事务的半消息。

    当全局事务提交时,这个半消息会自动commit,完成投递。

    当全局事务回滚时,这个半消息会自动cancel,完成丢弃。

    // com.huawei.dtm.mq.server.model.RocketMqTemplate.java
    public void sendMsg(int userId, int money) throws Exception {
        // 为了保证消费幂等性,每一个消息带一个 唯一的UUID。
        String uuid = UUID.randomUUID().toString();
        String msgBody = uuid + "__" + money + "__" + userId;
        Message msg =
            new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
        producer.sendMessageInTransaction(msg, null);
    }

mqConsumer服务

  • 订阅转账消息,收到消息后发生转账。

    消息的消费方要保证消费幂等性RocketMQ本身的消息实现机制,会有重复消费消息的问题。

    // com.huawei.dtm.mq.consumer.config.WebConfig.java
    @Bean
    public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
        consumer.setNamesrvAddr("http://127.0.0.1:9876");
        consumer.subscribe("dtm-topic-mq", "*");
        /**
         * 消费的时候要保证幂等性, RocketMQ 为确保能正常消费, 在少数极端场景有重复消费问题。
         */
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                String msg = new String(msgs.get(0).getBody());
                System.out.println("Receive msg : " + msg);
                String[] arr = msg.split("__");
                if (ConsumerStore.INST.alreadyConsume(arr[0])) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                /**
                 * 确定唯一标识, 已经消费不在重复消费, ROCKETMQ 会保证消息至少会被消费一次. 防止重复消费
                 */
                bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
                ConsumerStore.INST.add(arr[0]);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        return consumer;
    }
    //com.huawei.common.impl.BankBService.java
    public void transferOut(int id, int money) {
        LOGGER.info("BankB transfer out");
        jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
    }
    

修改配置文件

  1. 通过IDEA打开dtm-servicecomb项目,修改banka所需配置文件。

    1. 修改“dtm-servicecomb\bank-a\src\main\resources\microservice.yaml”文件中的数据库配置信息,参考表1,修改username、password和url为您创建好的数据库用户名、密码和地址,拷贝完成后保存退出。
      spring:
        datasource:
          banka:
            username: ${db_user_name}
            password: ${db_user_pwd}
            url: jdbc:mysql://${db_ip}:3306/banka?verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
      表1 microservice.yaml配置文件参数详解

      参数

      说明

      db_user_name

      客户端MySQL数据库用户名。

      db_user_pwd

      客户端MySQL数据库的密码。

      db_ip

      客户端MySQL数据库业务数据库地址,分别为banka和bankb的地址,根据实际修改。

    2. 修改“dtm-servicecomb\bank-a\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    3. 修改“dtm-servicecomb\bank-a\src\main\resources\microservice.yaml”文件中的address信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
      表2 配置microservice.yaml servicecomb参数详解

      参数

      说明

      cse_address

      服务注册发现地址,该配置项可从“应用管理与运维平台”微服务 CSE > 引擎实例界面中得到。

    4. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  2. 修改bankb所需配置文件。

    1. 修改“dtm-servicecomb\bank-b\src\main\resources\microservice.yaml”文件中的数据库配置信息,参考表1,修改username、password和url为您创建好的数据库用户名、密码和地址,拷贝完成后保存退出。
      spring:
        datasource:
          bankb:
            username: ${db_user_name}
            password: ${db_user_pwd}
            url: jdbc:mysql://${db_ip}:3306/bankb?verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    2. 修改“dtm-servicecomb\bank-b\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    3. 修改“dtm-servicecomb\bank-b\src\main\resources\microservice.yaml”文件中address信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
    4. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  3. 修改bankcenter所需配置文件。

    1. 修改“dtm-servicecomb\bank-center\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    2. 修改“dtm-servicecomb\bank-center\src\main\resources\microservice.yaml”文件中servicecomb信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
    3. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  4. 修改invoke所需配置文件。

    1. 修改“dtm-servicecomb\invoke-service\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    2. 修改“dtm-servicecomb\invoke-service\src\main\resources\microservice.yaml”文件中servicecomb信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
    3. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  5. 修改mq-service所需配置文件。

    1. 修改“dtm-servicecomb\mq-service\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    2. 修改“dtm-servicecomb\mq-service\src\main\resources\microservice.yaml”文件中servicecomb信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
    3. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  6. 修改mq-consumer所需配置文件。

    1. 修改“dtm-servicecomb\mq-consumer\src\main\resources\microservice.yaml”文件中的handler配置,Consumer增加dtm-consumer,Provider增加dtm-provider。
      servicecomb:
        ...
        handler:
          chain:
            Consumer:
              default: loadbalance,bizkeeper-consumer,dtm-consumer
            Provider:
              default: dtm-provider
    2. 修改“dtm-servicecomb\mq-consumer\src\main\resources\microservice.yaml”文件中servicecomb信息,参考表2
      servicecomb:
        service:
          registry:
            address: ${cse_address}
    3. 参考表 microservice.yaml配置文件参数详解,修改“microservice.yaml”文件的username、password和url为您创建好的bankB数据库用户名、密码和地址。
      spring:
        main:
          allow-bean-definition-overriding: true
        datasource:
          bank:
            username: ${db_user_name}
            password: ${db_user_pwd}
            url: jdbc:mysql://${db_ip}/bankb?verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
            driver-class-name: com.mysql.cj.jdbc.Driver
    4. (可选)配置AK/SK。

      如果使用微服务引擎专业版,需要配置AK/SK;如果使用未开启安全认证的微服务引擎专享版,不需要配置AK/SK,可以跳过这个步骤。

      “microservice.yaml”文件中添加以下配置信息,并修改accessKey、secretKey和project。
        servicecomb:  
          credentials:
            accessKey: AK
            secretKey: SK
            project: 项目名称
            akskCustomCipher: default

  7. 通过Maven执行clean、install,将工程进行打包,生成banka、bankb、bankcenter、invoke、mq-service以及mq-consumer服务的jar包。
  8. 登录创建好的弹性云服务器,将六个jar包放在同级目录下,同时在目录下新建“dtm-config”文件夹。在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,将下列配置拷贝到配置文件“dtmClientConfig.properties”中,参考表3,按实际修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address和rpc-ssl-switch。

    auto-create-table-dtm-tran-info=on
    dtm-app-name=xxxx
    sc-server-address=xxxx
    rpc-ssl-switch=off
    表3 dtmClientConfig.properties配置文件参数详解

    参数

    说明

    auto-create-table-dtm-tran-info

    是否自动创建DTM事务表dtm_tran_info,用来记录事务信息。

    • on:自动创建
    • off:手动创建

    dtm-app-name

    应用名称,该配置项可从“应用管理与运维平台”分布式事务管理 DTM > 引擎实例界面中得到。

    sc-server-address

    服务中心地址,该配置项可从“应用管理与运维平台”分布式事务管理 DTM > 引擎实例界面中得到。

    rpc-ssl-switch

    SSL开关,该配置项可从“应用管理与运维平台”分布式事务管理 DTM > 引擎实例界面中得到。

    • on:开启SSL
    • off:关闭SSL
    图1 DTM引擎实例

启动测试样例

  1. 登录ECS,在bankA-service-servicecomb.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bankA-service-servicecomb.jar启动bankA服务。
  2. 打开第二个窗口登录ECS,在bankB-service-servicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-servicecomb.jar启动bankB服务。
  3. 打开第三个窗口登录ECS,在bankCenter-service-servicecomb.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bankCenter-service-servicecomb.jar启动bankCenter服务。
  4. 打开第四个窗口登录ECS,在mq-service-servicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-servicecomb.jar启动mq-service服务
  5. 打开第五个窗口登录ECS,在invoke-service-servicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar invoke-service-servicecomb.jar启动invoke服务。

    [0] 初始化数据库, 重置账号资金;
    [1] 查询 Bank A 和 Bank B 余额;
    [2] 非侵入用例 -> DTM 事务 微服务场景调用;
    [3] TCC用例 -> DTM 事务 微服务场景调用;
    [4] DTM对接消息用例 -> DTM 事务 微服务场景调用;
    [5] EXIT;
    请选择命令执行操作

  6. 在invoke服务中,输入命令0初始化数据库。

    2021-03-23 11:51:59.417 [main] INFO  c.h.d.c.service.TransferService - Init bankA bankB success

  7. 输入命令1查询帐号余额,结果如下图所示。

    |--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
    |             0|            1000000|            1000000|      2000000|
    |             1|            1000000|            1000000|      2000000|
    |             2|            1000000|            1000000|      2000000|
    |             3|            1000000|            1000000|      2000000|
    |             4|            1000000|            1000000|      2000000|
    ......
    |           496|            1000000|            1000000|      2000000|
    |           497|            1000000|            1000000|      2000000|
    |           498|            1000000|            1000000|      2000000|
    |           499|            1000000|            1000000|      2000000|
    2021-06-15 21:01:41.243  INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000

  8. 输入命令4,执行非侵入样例DTM对接消息事务的微服务场景验证。
  9. 输入运行的线程数量、每个线程的事务数量以及发生异常的概率值,输入格式为“线程数量:单线程事务数量:异常概率”,运行结束后输入命令1查询帐号余额,结果如下图所示,bankA转入了200,bankB转出了100,由于mq-consumer服务没有启动,通过MQ消息传递的bankB转出100的消息没有被消费,导致最终数据不一致。

    请输入命令执行操作:(当前远程调用/feign)
    4
    请输入线程数量:单线程事务数量:异常概率
    10:10:50
    ......
    |--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
    |             0|            1000000|            1000000|      2000000|
    |             1|            1000000|            1000000|      2000000|
    [ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100
    [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100
    |             4|            1000000|            1000000|      2000000|
    ......
    |           496|            1000000|            1000000|      2000000|
    |           497|            1000000|            1000000|      2000000|
    [ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100
    |           499|            1000000|            1000000|      2000000|
    2021-06-15 21:01:41.243  INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000

  10. 打开第六个窗口登录ECS,在mq-consumer-servicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-servicecomb.jar启动mq-consumer服务。
  11. 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,最终数据保持一致,结果如下图所示。

    |--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
    |             0|            1000000|            1000000|      2000000|
    |             1|            1000000|            1000000|      2000000|
    |             2|            1000000|            1000000|      2000000|
    |             3|            1000000|            1000000|      2000000|
    |             4|            1000000|            1000000|      2000000|
    ......
    |           496|            1000000|            1000000|      2000000|
    |           497|            1000000|            1000000|      2000000|
    |           498|            1000000|            1000000|      2000000|
    |           499|            1000200|             999800|      2000000|
    2021-06-15 21:01:41.243  INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 49990000,sum 1000000000

  12. 输入命令5退出执行程序。
分享:

    相关文档

    相关产品