• 可靠消息最终一致性分布式事务


    一、前言

    可靠消息最终一致性方案主要适用于消息数据能够独立存储:

    1. 能够降低系统之间耦合度

    2. 业务对数据一致性的时间敏感度高

    此方案需要实现的服务模式:

    1. 可查询操作:提供查询自身事务状态的接口。

    2. 幂等操作:只要参数相同,无论调用多少次接口,都应该和第一次调用产生的结果相同。

    那么什么时候回查?

    事务发送端执行本地事务时(已经发送了 Half 消息了),这时候发送端宕机了或者超时了,就需要回查了。

    (1)实现方案

    实现方案有两种:

    1.基于本地消息

    • 优点:在业务应用中实现了消息的可靠性,减少了对消息中间件的依赖。

    • 缺点:

    1. 绑定了具体的业务场景,耦合性太高,不可公用和扩展。

    2. 消息数据与业务数据在同一数据库,占用了业务系统的扩展。

    3. 消息数据可能会受到数据库并发性的影响。

    2.基于消息队列中间件

    • 优点:

    1. 消息数据能够独立存储,与具体的业务数据库解耦。

    2. 消息的并发性和吞吐量优于本地消息表方案。

    • 缺点:

    1. 发送一次消息需要完成两次网络交互:1.消息的发送 ; 2. 消息的提交或回滚。

    2. 需要实现消息的回查接口,增加了开发成本。

    (2)注意的问题

    1、事务发送方本地事务与消息发送的原子性问题:

    • 原因:执行本地事务和发送消息,要么都成功,要么都失败。

    • 解决方案:通过消息确认服务本地事务执行成功。

    1. // 原子性:事务 + 消息确认(回滚)
    2. @Override
    3. @Transactional(rollbackFor = Exception.class)
    4. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
    5. try{
    6. TxMessage txMessage = this.getTxMessage(msg);
    7. // 1. 执行本地事务
    8. orderService.submitOrderAndSaveTxNo(txMessage);
    9. // 2. 提交事务
    10. return RocketMQLocalTransactionState.COMMIT;
    11. } catch (Exception e){
    12. // 异常回滚事务
    13. return RocketMQLocalTransactionState.ROLLBACK;
    14. }
    15. }

    2、事务参与方接收消息的可靠性问题:

    • 原因:由于服务器宕机、服务崩溃或网络异常等原因,导致事务参与方不能正常接收消息; 或者接收消息后处理事务的过程中发生异常,无法将结果正确回传到消息库中。

    • 解决方案:通过消息恢复服务保证事务参与方的可靠性。

    3、事务参与方接收消息的幂等性问题:

    • 原因:可靠消息服务可能会多次向事务参与方发送消息

    • 解决方案:需要具有幂等性,只要参数相同,无论调用多少次接口或方法,结果都相同。

    C/C++Linux服务器开发高级架构师/C++后台开发架构师​免费学习地址

    另外还整理一些C++后台开发架构师 相关学习资料,面试题,教学视频,以及学习路线图,免费分享有需要的可以自行添加:Q群:720209036 点击加入~ 群文件共享

    (3)实战

    通过 RocketMQ 消息中间件实现可靠消息最终一致性分布式事务,模拟电商业务中的下单扣减库存场景。 涉及服务有:

    • 订单服务

    • 库存服务

    ​整体流程如下:

    • 第一步:订单服务向 RocketMQ 发送 Half 消息。

    • 第二步:RocketMQ 向订单服务响应 Half 消息发送成功。

    • 第三步:订单服务执行本地事务,向本地数据库中插入、更新、删除数据。

    • 第四步:订单服务向 RocketMQ 发送提交事务或者回滚事务的消息。

    • 第五步:如果库存服务未收到消息,或者执行事务失败,且 RocketMQ 未删除保存的消息数据,RocketMQ 会回查订单服务的接口,查询事务状态,以此确认是再次提交事务还是回滚事务。

    • 第六步:订单服务查询本地数据库,确认事务是否执行成功。

    • 第七步:订单服务根据查询出的事务状态,向 RocketMQ 发送提交事务或者回滚事务的消息。

    • 第八步:如果第七步中订单服务向 RocketMQ 发送的是提交事务的消息,则 RocketMQ 会向库存服务投递消息。

    • 第九步:如果第七步中订单服务向 RocketMQ 发送的是回滚事务的消息,则 RocketMQ 不会向库存微服务投递消息,并且会删除内部存储的消息数据。

    • 第十步:如果 RocketMQ 向库存服务投递的是执行本地事务的消息,则库存服务会执行本地事务,向本地数据库中插入、更新、删除数据。

    • 第十一步:如果 RocketMQ 向库存服务投递的是查询本地事务状态的消息,则库存服务会查询本地数据库中事务的执行状态。

    二、实战实验

    涉及服务有:

    • 订单服务:项目地址

    • 库存服务:项目地址

    实验准备:

    • MySQL:8.0.20

    • RocketMQ 消息中间件:rocketmq-all-4.5.0-bin-release

    • RocketMQ 客户端:rocketmq-spring-boot-starter 2.0.2

    • Spring Boot 版本:2.2.6.RELEASE

    订单服务重点相关代码:

    1.发送 Half 消息:

    1. @Slf4j
    2. @Service
    3. public class OrderServiceImpl implements OrderService {
    4. @Override
    5. public void submitOrder(Long productId, Integer payCount) {
    6. // 1. 生成全局分布式序列号
    7. String txNo = UUID.randomUUID().toString();
    8. 。。。 。。。
    9. // 2. 封装消息
    10. Message<String> message =
    11. MessageBuilder.withPayload(jsonObject.toJSONString()).build();
    12. // 3. 发送一条事务消息
    13. rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg",
    14. message, null);
    15. }
    16. }

    2.处理本地事务

    1. @Slf4j
    2. @Component
    3. @RocketMQTransactionListener(txProducerGroup = "tx_order_group")
    4. public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
    5. @Autowired
    6. private OrderService orderService;
    7. @Autowired
    8. private OrderMapper orderMapper;
    9. @Override
    10. @Transactional(rollbackFor = Exception.class)
    11. public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
    12. Object obj) {
    13. try{
    14. // 1. 获取消息并解析消息
    15. TxMessage txMessage = this.getTxMessage(msg);
    16. // 2. 提交订单 并且 保存事务日志
    17. orderService.submitOrderAndSaveTxNo(txMessage);
    18. // 3. 事务状态为提交
    19. return RocketMQLocalTransactionState.COMMIT;
    20. }catch (Exception e){
    21. // 发生异常
    22. // 事务状态为回滚
    23. return RocketMQLocalTransactionState.ROLLBACK;
    24. }
    25. }
    26. @Override
    27. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    28. // 1. 获取消息并解析消息
    29. TxMessage txMessage = this.getTxMessage(msg);
    30. // 2. 查询订单是否存在
    31. Integer exists = orderMapper.isExistsTx(txMessage.getTxNo());
    32. if(exists != null){
    33. // 订单存在:事务状态为提交
    34. return RocketMQLocalTransactionState.COMMIT;
    35. }
    36. // 订单不存在:事务状态为未知
    37. // 这里需要再次调用:处理本地事务嘛?
    38. return RocketMQLocalTransactionState.UNKNOWN;
    39. }
    40. }

    库存服务重点相关代码:

    1. @Slf4j
    2. @Component
    3. @RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
    4. public class StockTxMessageConsumer implements RocketMQListener<String> {
    5. @Autowired
    6. private StockService stockService;
    7. @Override
    8. public void onMessage(String message) {
    9. // 监听到对应消息
    10. // 获取消息并解析
    11. TxMessage txMessage = this.getTxMessage(message);
    12. stockService.decreaseStock(txMessage);
    13. }
    14. }

    服务:

    • 订单服务端口:8080

    • 库存服务端口:8081

    • RocketMQ: 9876

    数据准备:

    1. USE tx_msg_stock;
    2. INSERT INTO stock (id, product_id, total_count) VALUES (1, 1001, 10000);
    3. SELECT * FROM stock;
    4. +----|------------|-------------+
    5. | id | product_id | total_count |
    6. +----|------------|-------------+
    7. | 1 | 1001 | 10000 |
    8. +----|------------|-------------+

    (1)正常流程

    1.请求下单接口:调用 订单服务

    1. $ curl "http://localhost:8080/order/submit_order?productId=1&payCount=1"
    2. 下单成功

    订单服务日志:

    1. 2022-05-09 14:19:05.197 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
    2. 2022-05-09 14:19:05.233 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务
    3. 2022-05-09 14:21:05.090 INFO 19423 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[172.17.0.3:10909] result: true

    2.库存服务日志:

    1. 2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.message.StockTxMessageConsumer : 库存微服务开始消费事务消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"3fcd4e8d-1f5b-448d-ad3d-693c335f994e"}}
    2. 2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.service.impl.StockServiceImpl : 库存微服务执行本地事务,商品id:1, 购买数量:1

    3.查看对应数据库:

    1. -- order 库下
    2. SELECT * FROM `order`;
    3. +---------------|---------------------|---------------|------------|-----------+
    4. | id | create_time | order_no | product_id | pay_count |
    5. +---------------|---------------------|---------------|------------|-----------+
    6. | 1652077145222 | 2022-05-09 14:19:05 | 1652077145224 | 1 | 1 |
    7. +---------------|---------------------|---------------|------------|-----------+
    8. SELECT * FROM tx_log;
    9. +--------------------------------------|---------------------+
    10. | tx_no | create_time |
    11. +--------------------------------------|---------------------+
    12. | 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:19:05 |
    13. +--------------------------------------|---------------------+
    14. -- stock 库下
    15. SELECT * FROM tx_log;
    16. +--------------------------------------|---------------------+
    17. | tx_no | create_time |
    18. +--------------------------------------|---------------------+
    19. | 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:23:00 |
    20. +--------------------------------------|---------------------+
    21. SELECT * FROM stock;
    22. +----|------------|-------------+
    23. | id | product_id | total_count |
    24. +----|------------|-------------+
    25. | 1 | 1 | 9999 |
    26. +----|------------|-------------+

    (2)异常流程:消息中间件宕机

    1. 步骤一出现异常

    即:还没开始下单,消息中间件宕机了,那么立刻下单失败。 如图:

    ​2. 步骤四出现异常

    即:此步骤还在下单事务中,当提交中间请求失败,本地事务不会回滚。

    • 这时候,RocketMQ 客户端会不断去重试。

    • 当 RocketMQ 恢复后,RocketMQ 会去查询一次

    ​实验步骤:

    1. 在执行本地事务之后,睡眠 30s

    2. 此期间,消息中间件宕机:broker 关闭。

    日志如下:

    1. 2022-05-09 15:27:14.980 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
    2. 2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务
    3. 2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 尝试睡 30s
    4. 2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
    5. 2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
    6. 2022-05-09 15:27:45.015 c.d.t.message.OrderTxMessageListener : 睡醒,起来干活了
    7. 2022-05-09 15:27:48.041 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
    8. 2022-05-09 15:28:09.017 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
    9. 2022-05-09 15:28:09.019 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
    10. 。。。。。。
    11. 2022-05-09 15:35:47.481 INFO 22815 --- [pool-1-thread-1] c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务

    (3)异常流程:事务发送方执行本地事务失败

    即:这时候捕捉到异常:

    1. 订单服务给 RocketMQ 发送回滚消息:RocketMQLocalTransactionState.ROLLBACK

    2. RocketMQ 接收到消息后,会回查

    3. 回查,发现不存在这个订单,订单服务向 RocketMQ 发送 未知消息:RocketMQLocalTransactionState.UNKNOWN UNKNOWN 未知状态:可能是事务正在执行中出异常等,这种情况下消息系统不知道该如何处理,当前的逻辑是会直接丢弃掉,等待后续检查逻辑来处理。

    ​日志如下:

    1. 2022-05-09 15:45:11.829 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
    2. 2022-05-09 15:45:11.861 c.d.t.message.OrderTxMessageListener : 订单微服务回滚事务
    3. 2022-05-09 15:45:47.490 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    4. 2022-05-09 15:46:47.484 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    5. 2022-05-09 15:47:47.489 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    6. 2022-05-09 15:48:47.491 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    7. 2022-05-09 15:49:47.492 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    8. 2022-05-09 15:50:47.494 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    9. 2022-05-09 15:51:47.496 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    10. 2022-05-09 15:52:47.497 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    11. 2022-05-09 15:53:47.498 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    12. 2022-05-09 15:54:47.501 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    13. 2022-05-09 15:55:47.501 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    14. 2022-05-09 15:56:47.503 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    15. 2022-05-09 15:57:47.504 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    16. 2022-05-09 15:58:47.506 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
    17. 2022-05-09 15:59:47.510 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务

    可看到 RocketMQ 回查了 15 次。

    (4)异常流程:事务接收方宕机

    即:RocketMQ 无法推送消息给消息接收方。

    • 此时,订单服务还是会下单成功

    • 库存服务无法处理

    当库存服务再次上线后:会接收到消息。

    订单服务日志:

    1. 2022-05-09 16:47:53.142 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
    2. 2022-05-09 16:47:53.174 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务

    库存服务日志:

    1. 2022-05-09 16:48:10.283 c.d.t.message.StockTxMessageConsumer : 库存微服务开始消费事务消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"d5d86dae-76c2-4b56-9597-a12dae14325a"}}
    2. 2022-05-09 16:48:10.284 c.d.t.service.impl.StockServiceImpl : 库存微服务执行本地事务,商品id:1, 购买数量:1

    (5)异常流程:事务接收方执行本地事务失败

    即:当接收到消息后,执行本地事务失败,RocketMQ 会不断发送消费消息。

    事务接收方执行本地事务失败,措施有:

    1. 记录日志,人工介入处理。

    2. 重试,再出错,则人工介入。

    实验步骤:

    1. 下单错误商品,订单下单成功。

    2. 库存中没有此商品数据,向上抛错。

    日志:

    1. 2022-05-09 14:19:59.930 WARN 19542 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=480, queueOffset=1, sysFlag=8, bornTimestamp=1652077145061, bornHost=/172.17.0.1:36848, storeTimestamp=1652077199916, storeHost=/172.17.0.3:10911, msgId=AC11000300002A9F0000000000000D1B, commitLogOffset=3355, bodyCR
    2. java.lang.NullPointerException: null
    3. at com.donald.txmsgstock.service.impl.StockServiceImpl.decreaseStock(StockServiceImpl.java:35) ~[classes/:na]
    4. at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:27) ~[classes/:na]
    5. at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:16) ~[classes/:na]
    6. at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]
    7. at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]
    8. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    9. at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_162]
    11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_162]
    12. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]

    参考资料

    ​推荐一个零声教育C/C++后台开发的免费公开课程,个人觉得老师讲得不错,分享给大家:C/C++后台开发高级架构师,内容包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

    原文地址:可靠消息最终一致性分布式事务 - 掘金

  • 相关阅读:
    分布式系统的基本问题:可用性与一致性【转载】
    必知必会的22种设计模式(GO语言)
    微信个人号如何实现自动回复,秒回客户消息?
    浅谈防勒索病毒方案之主机加固
    华秋DFM从2.1.6升级到3.x版本出现的问题
    三层神经网络模型
    银河麒麟服务器系统使用的一些问题和解决方案
    One class learning(SVDD)
    蓝桥杯2023年-阶乘的和(数学推理,C++)
    JavaScript-1-菜鸟教程
  • 原文地址:https://blog.csdn.net/Linuxhus/article/details/127461895