• 22-07-29 西安 分布式事务、Seata


    原来大脑真不是自己的,你想让它安静一会,它偏偏胡思乱想,你想让它只注意自己的呼吸,它偏偏给你来段高山流水......                                                                                 

    --- 《冥想-专注呼吸》


    分布式事务

    单体应用,在同一个数据源上更新数据来完成一项业务,整个业务过程的数据一致性本地事务来保证。

    微服务:分别使用独立的数据源,业务过程将由 3 个服务的调用来完成,此时,每一个服务内部的数据一致性仍由本地事务来保证。

    那整个业务层面的全局数据一致性要如何保障呢

    分布式事务的解决方案,保障业务全局的数据一致性。如下图,解决本地事务无法交互的问题


    分布式事务

    分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上,保证了不同数据库的数据一致性。

    分布式系统

    部署在不同节点上的系统通过网络交互来完成协同工作的系统

    分布式事务要考虑的问题,比如对多资源的协调、事务的跨服务传播等


    分布式事务解决方案

    1、基于XA协议的两阶段提交(2PC)

    XA协议:XA是一个分布式事务协议(保证通信)。XA中大致分为两部分:事务管理器TM本地资源管理器RM。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库(关系型数据库)都实现了XA接口,而事务管理器(TM)作为全局的调度者,负责各个本地资源的提交和回滚。

    2PC顾名思义分为两个阶段,其实施思路可概括为:

    (1)投票阶段(voting phase):参与者将操作结果通知协调者(TC);

    (2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;

    ----------------------

    2PC的缺陷

    算法执行过程中,所有节点都处于阻塞状态,所有节点所持有的资源(例如数据库数据,本地文件等)都处于封锁状态。

    1.某一个参与者发出通知之前,所有参与者以及协调者都处于阻塞状态;

    2.在协调者发出通知之前,所有参与者都处于阻塞状态

    另外,如有协调者或者某个参与者出现了崩溃,为了避免整个算法处于一个完全阻塞状态,往往需要借助超时机制来将算法继续向前推进,故此时算法的效率比较低

    2PC效率很低,分布式事务很难做(最好能做成IO不阻塞)

    ----------

    2PC俩阶段提交有哪些不足?

    1.性能问题
    2PC遵循强一致性。在事务执行过程中,各个节点占用着数据库资源,只有当所有节点准备完毕,事务协调者才会通知提交,参与者提交后释放资源。这样的过程有着非常明显的性能问题。
    2.协调者单点故障问题
    2PC模型的核心,一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知,参与者会一直处于中间状态无法完成事务。
    3.丢失消息导致的不一致问题
    第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。


    2、代码补偿事务(TCC)

    TCC是Try ( 尝试 ) — Confirm(确认) — Cancel ( 取消 ) 的简称。

    TCC的作用主要是解决跨服务调用场景下的分布式事务问题

    操作方法

    含义

    Try预留

    完成所有业务检查(一致性),预留业务资源(准隔离性)

    回顾上面航班预定案例的阶段1,机票就是业务资源,所有的资源提供者(航空公司)预留都成功,try阶段才算成功

    Confirm

    确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。

    回顾上面航班预定案例的阶段2,美团APP确认两个航空公司机票都预留成功,因此向两个航空公司分别发送确认购买的请求。

    Cancel

    取消Try阶段预留的业务资源。

    回顾上面航班预定案例的阶段2,如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。

    ------------------------------------

    TCC两阶段提交与XA两阶段提交的区别

    XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁

    TCC是业务层面(代码层面)的分布式事务,最终一致性,不会一直持有资源的锁。其核心在于将业务分为两个操作步骤完成。不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务


    3、 本地消息表

    订单系统新增一条消息表,将新增订单和新增消息放到一个事务里完成,然后通过轮询的方式去查询消息表,将消息推送到 MQ,库存系统去消费 MQ。

    • 执行流程:
      • 订单系统,添加一条订单和一条消息,在一个事务里提交。
      • 订单系统,使用定时任务轮询查询状态为未同步的消息表,发送到 MQ,如果发送失败,就重试发送
      • 库存系统,接收 MQ 消息,修改库存表,需要保证幂等操作。
      • 如果修改成功,调用 RPC 接口修改订单系统消息表的状态为已完成或者直接删除这条消息。
      • 如果修改失败,可以不做处理,等待重试。
    • 订单系统中的消息有可能由于业务问题会一直重复发送,所以为了避免这种情况可以记录一下发送次数,当达到次数限制之后报警,人工接入处理;库存系统需要保证幂等,避免同一条消息被多次消费造成数据一致。

    本地消息表这种方案实现了最终一致性,需要在业务系统里增加消息表,业务逻辑中多一次插入的 DB 操作,所以性能会有损耗,而且最终一致性的间隔主要由定时任务的间隔时间决定。


    4、MQ事务消息

    RocketMQ支持事务消息,RabbitMQ 和 Kafka不支持事务消息。

    以阿里的 RocketMQ 中间件为例,其思路大致为:

    1. 发送方在业务执行开始会先向消息服务器中投递 “ 半消息 ” ,半消息即暂时不会真正投递的消息(不能消费),当发送方(即生产者)将消息成功发送给了MQ服务端且并未将该消息的二次确认结果返回,此时消息状态是“ 暂时不可投递 ” 状态(可以认为是状态未知)。该状态下的消息即半消息。
    2. 如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ服务端会通过扫描发现某条消息长期处于 “ 半消息 ” 状态,MQ服务端会主动向生产者查询该消息的最终状态是处于Commit(消息提交)还是Rollback(消息回滚)。这个过程称为消息回查

    在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

    总体而言RocketMQ事务消息分为两条主线

    • 定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
    • 定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果

    具体流程如下

    1、Producer 向 MQ 服务器 发送消息 , MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

    2、MQ 服务器收到消息并持久化成功之后,会向Producer 确认首次消息发送成功,此时消息处于 half message(半消息) 状态,并未发送给对应的 Consumer 。

    3、Producer 开始执行本地事务逻辑 , 通过本地数据库事务控制。

    4、根据事务执行结果,Producer 向 MQ 服务器提交二次确认 ( commit 或者 rollback) 。MQ Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。

    5、在断网或者应用重启的情况下,二次确认未成功的发给 MQ Server,MQ Server 会主动向 Producer 启动消息回查

    6、Producer 根据事务执行结果,对消息回查返回对应的结果。

    7、Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。

    注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。

    优点: 实现了最终一致性,不需要依赖本地数据库事务。

    缺点: 目前主流MQ中只有RocketMQ支持事务消息。


    Seata

    一、Seata介绍

    Seata全称:Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架。

    Seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

    Seata将为用户提供了AT、TCC、SAGA、XA事务模式,为用户打造一站式的分布式解决方案。

    Seata中有两种分布式事务实现方案,AT及TCC

    1. AT模式(自动版)主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性问题 2PC-改进
    2. TCC 模式(手动版)主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题

    Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样

    AT模式(Automatic (Branch) Transaction Mode)

    1. Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
    2. Transaction Manager(TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交全局回滚的决议。
    3. Resource Manager (RM):资源管理器,负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚
    4. XID:一个全局事务的唯一标识

    其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。


    二、搭建、配置、启动 Nacos和Seata

    版本声明:

    nacos-server-1.4.2 + seata-server-1.4.2 + MySQL5.7 + Hoxton.SR9 + Alibaba2.2.6.RELEASE + SpringBoot2.3.2.RELEASE

    1、搭建注册中心和配置中心Nacos

    22-07-24 西安 SpringCloud(03) SpringCloud alibaba、Nacos_£小羽毛的博客-CSDN博客

    注意,我这里使用的是windows的nacos,本机127.0.0.1上的mysql,不是虚拟机上的mysql/2。

    2、在配置中心新增如下配置

    2.1新建dataId为seataServer.properties的配置,内容如下:

     复制以下内容到该配置文件中,有俩个重点关注的点

    1. 虚拟组配置,所有的微服务需要加入名称为:my_test_tx_group的事务组中
    2. 设置TC进行全局事务控制的数据存储方式:store.mode有file,db,redis三种类型。这里选择db,设置mysql连接信息TC自己也有自己的库
    1. transport.type=TCP
    2. transport.server=NIO
    3. transport.heartbeat=true
    4. transport.enableClientBatchSendRequest=true
    5. transport.threadFactory.bossThreadPrefix=NettyBoss
    6. transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
    7. transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
    8. transport.threadFactory.shareBossWorker=false
    9. transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
    10. transport.threadFactory.clientSelectorThreadSize=1
    11. transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
    12. transport.threadFactory.bossThreadSize=1
    13. transport.threadFactory.workerThreadSize=default
    14. transport.shutdown.wait=3
    15. service.vgroupMapping.my_test_tx_group=default
    16. service.default.grouplist=127.0.0.1:8091
    17. service.enableDegrade=false
    18. service.disableGlobalTransaction=false
    19. client.rm.asyncCommitBufferLimit=10000
    20. client.rm.lock.retryInterval=10
    21. client.rm.lock.retryTimes=30
    22. client.rm.lock.retryPolicyBranchRollbackOnConflict=true
    23. client.rm.reportRetryCount=5
    24. client.rm.tableMetaCheckEnable=false
    25. client.rm.tableMetaCheckerInterval=60000
    26. client.rm.sqlParserType=druid
    27. client.rm.reportSuccessEnable=false
    28. client.rm.sagaBranchRegisterEnable=false
    29. client.rm.tccActionInterceptorOrder=-2147482648
    30. client.tm.commitRetryCount=5
    31. client.tm.rollbackRetryCount=5
    32. client.tm.defaultGlobalTransactionTimeout=60000
    33. client.tm.degradeCheck=false
    34. client.tm.degradeCheckAllowTimes=10
    35. client.tm.degradeCheckPeriod=2000
    36. client.tm.interceptorOrder=-2147482648
    37. store.mode=db
    38. store.db.datasource=druid
    39. store.db.dbType=mysql
    40. store.db.driverClassName=com.mysql.cj.jdbc.Driver
    41. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
    42. store.db.user=root
    43. store.db.password=123456
    44. store.db.minConn=5
    45. store.db.maxConn=30
    46. store.db.globalTable=global_table
    47. store.db.branchTable=branch_table
    48. store.db.queryLimit=100
    49. store.db.lockTable=lock_table
    50. store.lock.mode=file
    51. store.session.mode=file
    52. store.publicKey=123
    53. server.recovery.committingRetryPeriod=1000
    54. server.recovery.asynCommittingRetryPeriod=1000
    55. server.recovery.rollbackingRetryPeriod=1000
    56. server.recovery.timeoutRetryPeriod=1000
    57. server.maxCommitRetryTimeout=-1
    58. server.maxRollbackRetryTimeout=-1
    59. server.rollbackRetryTimeoutUnlockEnable=false
    60. server.distributedLockExpireTime=10000
    61. client.undo.dataValidation=true
    62. client.undo.logSerialization=jackson
    63. client.undo.onlyCareUpdateColumns=true
    64. server.undo.logSaveDays=7
    65. server.undo.logDeletePeriod=86400000
    66. client.undo.logTable=undo_log
    67. client.undo.compress.enable=true
    68. client.undo.compress.type=zip
    69. client.undo.compress.threshold=64k
    70. log.exceptionRate=100
    71. transport.serialization=seata
    72. transport.compressor=none
    73. metrics.enabled=false
    74. metrics.registryType=compact
    75. metrics.exporterList=prometheus
    76. metrics.exporterPrometheusPort=9898

    2.2 顺便在mysql中创建数据库seata,和以下三张表(建表的sql。。。我是直接导入的)

    3.新建dataId为common.yml配置所有微服务共享这个配置(seata不会用)。

    内容如下: 

     大功告成,如下

     4.搭建TC服务器[Seata]

    4.1.下载地址:https://seata.io/zh-cn/index.html 

    4.2 解压seata-server-1.4.2.zip安装

    4.3 修改\conf\registry.conf设置TC 服务对应的注册中心和配置中心。 

    registry {

      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

      type = "nacos"

      nacos {

        application = "seata-server"

        serverAddr = "127.0.0.1:8848"

        group = "SEATA_GROUP"

        namespace = ""

        cluster = "default"

        username = "nacos"

        password = "nacos"

      }

    }

    config {

      # file、nacos 、apollo、zk、consul、etcd3

      type = "nacos"

      nacos {

        serverAddr = "127.0.0.1:8848"

        namespace = ""

        group = "SEATA_GROUP"

        username = "nacos"

        password = "nacos"

        dataId = "seataServer.properties"

      }

    }

    4.4 启动Seata 

    \bin目录运行seata-server.bat


    三、Seata分布式事务验证

    项目结构

    需求分析

    用户下单(TM)的时候记录下单日志,完成订单添加,完成商品库存削减功能,完成用户账户扣款,可以在扣款时制造异常,测试全局分布式事务一致性


    1、在fescar-api项目bootstrap.yml文件中引入common.yml配置,共享给其他服务使用。

    1. spring:
    2. cloud:
    3. nacos:
    4. discovery:
    5. server-addr: localhost:8848
    6. group: SEATA_GROUP
    7. config:
    8. server-addr: localhost:8848
    9. file-extension: yml
    10. group: SEATA_GROUP
    11. shared-configs[0]:
    12. data-id: common.yml
    13. refresh: true
    14. group: SEATA_GROUP

    2、在fescar-api项目中引入依赖,排除低版本依赖,重新引入1.4.2;传递给其他微服务项目使用。

    1. <dependency>
    2. <groupId>com.alibaba.cloudgroupId>
    3. <artifactId>spring-cloud-starter-alibaba-seataartifactId>
    4. <exclusions>
    5. <exclusion>
    6. <groupId>io.seatagroupId>
    7. <artifactId>seata-spring-boot-starterartifactId>
    8. exclusion>
    9. exclusions>
    10. dependency>
    11. <dependency>
    12. <groupId>io.seatagroupId>
    13. <artifactId>seata-spring-boot-starterartifactId>
    14. <version>1.4.2version>
    15. dependency>

    3、在 fescar-api 工程下面新建配置类。

    在这里配置类中配置的数据源,是为seata库服务的,为了操作undo_log表。undo_log表,每个业务数据库都需要有这张表,用于数据的rollback。

    1. @Configuration
    2. public class DataSourceProxyConfig {
    3. /**
    4. * 普通数据源
    5. * @return
    6. */
    7. @Bean
    8. @ConfigurationProperties(prefix = "spring.datasource")
    9. public DataSource dataSource() {
    10. return new DruidDataSource();
    11. }
    12. /**
    13. * 代理数据源绑定DataSourceProxy ---> undo_log的操作
    14. * @param dataSource
    15. * @return
    16. */
    17. @Bean
    18. public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    19. return new DataSourceProxy(dataSource);
    20. }
    21. /**
    22. * mybatis--->手动指定sqlSessionFactory所使用的代理数据源
    23. * @param dataSourceProxy
    24. * @return
    25. * @throws Exception
    26. */
    27. @Bean
    28. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    29. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    30. // 换成代理数据源
    31. sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    32. return sqlSessionFactoryBean.getObject();
    33. }
    34. }

    4、在入口方法上添加@GlobalTransactional

    TM:使用tc(seata)控制xid

    全局事务:TM控制事务的commit和rollback

    1. /***
    2. * ①
    3. * 下单
    4. * @GlobalTransactional:全局事务入口
    5. * @param username
    6. * @param id
    7. * @param count
    8. */
    9. @GlobalTransactional
    10. @Override
    11. public void add(String username, int id, int count) {
    12. //添加订单日志
    13. LogInfo logInfo = new LogInfo();
    14. logInfo.setContent("添加订单数据---"+new Date());
    15. logInfo.setCreatetime(new Date());
    16. int logcount = logInfoMapper.insertSelective(logInfo);
    17. System.out.println("添加日志受影响行数:"+logcount);
    18. //添加订单
    19. orderInfoFeign.add(username,id,count);
    20. int price = 10;
    21. //用户账户余额递减
    22. userInfoFeign.decrMoney(username,price * count);
    23. }

    其他微服务的业务方法增加@Transactionial注解【本地事务】;在fescar-user的UserInfoServiceImpl中模拟异常,验证全局事务回滚。

    经过测试,当然是好使的呢,从表面来看一个注解就搞定了,太牛了


    四、Seata 原理

    1、分布式事务的执行流程

    • TM开启分布式事务(TM向TC注册全局事务记录)
    • 换业务场景,编排数据库,服务等事务内资源(RM向TC汇报资源准备状态)
    • TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务)
    • TC汇总事务信息,决定分布式事务是提交还是回滚
    • TC通知所有RM提交/回滚资源,事务二阶段结束。

    2、AT模式下,如何做到对业务的无入侵

    ---------------------------

    一阶段加载

    在一阶段,Seata会拦截"业务SQL"
    1.解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”(快照)
    2.执行"业务SQL"更新业务数据,在业务数据更新之后,保存成“after image”(快照),最后生成行锁。
    以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

    -------------------------

    二阶段提交

    二阶段如果顺利提交的话,因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

    ------------------------------

    二阶段回滚

    二阶段如果是回滚的话,Seata就需要回滚一阶段执行的“业务SQL”,还原业务数据。

    回滚方式便是用“before image”还原业务数据。

    但在还原前要首先校验脏写,对比“数据库当前业务数据”和“after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

  • 相关阅读:
    Maven入门
    【计算机毕业设计】会议发布与预约系统小程序
    谷粒学苑_第十天
    webpack5 之 优化效率工具 ( 进度条、速度分析 、打包后提交分析)
    nodejs+vue+elementui汽车出入库零配件4S服务管理系统python-java
    【手写Mybatis】step01:创建简单的代理工厂
    数据结构与算法(十三)——红黑树2
    单片机KEIL C51堆栈地址分配原理,变量放RAM,small编译按data
    (数据科学学习手札160)使用miniforge代替miniconda
    win11恢复win10版鼠标右键菜单
  • 原文地址:https://blog.csdn.net/m0_56799642/article/details/126049354