原来大脑真不是自己的,你想让它安静一会,它偏偏胡思乱想,你想让它只注意自己的呼吸,它偏偏给你来段高山流水......
--- 《冥想-专注呼吸》
单体应用,在同一个数据源上更新数据来完成一项业务,整个业务过程的数据一致性由本地事务来保证。
微服务:分别使用独立的数据源,业务过程将由 3 个服务的调用来完成,此时,每一个服务内部的数据一致性仍由本地事务来保证。
那整个业务层面的全局数据一致性要如何保障呢?
分布式事务的解决方案,保障业务全局的数据一致性。如下图,解决本地事务无法交互的问题
分布式事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上,保证了不同数据库的数据一致性。
分布式系统
部署在不同节点上的系统通过网络交互来完成协同工作的系统
分布式事务要考虑的问题,比如对多资源的协调、事务的跨服务传播等
分布式事务解决方案
XA协议:XA是一个分布式事务协议(保证通信)。XA中大致分为两部分:事务管理器(TM)和本地资源管理器(RM)。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库(关系型数据库)都实现了XA接口,而事务管理器(TM)作为全局的调度者,负责各个本地资源的提交和回滚。
2PC顾名思义分为两个阶段,其实施思路可概括为:
(1)投票阶段(voting phase):参与者将操作结果通知协调者(TC);
(2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;
----------------------
2PC的缺陷
算法执行过程中,所有节点都处于阻塞状态,所有节点所持有的资源(例如数据库数据,本地文件等)都处于封锁状态。
1.某一个参与者发出通知之前,所有参与者以及协调者都处于阻塞状态;
2.在协调者发出通知之前,所有参与者都处于阻塞状态;
另外,如有协调者或者某个参与者出现了崩溃,为了避免整个算法处于一个完全阻塞状态,往往需要借助超时机制来将算法继续向前推进,故此时算法的效率比较低。
2PC效率很低,分布式事务很难做(最好能做成IO不阻塞)
----------
2PC俩阶段提交有哪些不足?
1.性能问题
2PC遵循强一致性。在事务执行过程中,各个节点占用着数据库资源,只有当所有节点准备完毕,事务协调者才会通知提交,参与者提交后释放资源。这样的过程有着非常明显的性能问题。
2.协调者单点故障问题
2PC模型的核心,一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知,参与者会一直处于中间状态无法完成事务。
3.丢失消息导致的不一致问题
第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。
TCC是Try ( 尝试 ) — Confirm(确认) — Cancel ( 取消 ) 的简称。
TCC的作用主要是解决跨服务调用场景下的分布式事务问题
操作方法 | 含义 |
Try(预留) | 完成所有业务检查(一致性),预留业务资源(准隔离性) 回顾上面航班预定案例的阶段1,机票就是业务资源,所有的资源提供者(航空公司)预留都成功,try阶段才算成功 |
Confirm | 确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。 回顾上面航班预定案例的阶段2,美团APP确认两个航空公司机票都预留成功,因此向两个航空公司分别发送确认购买的请求。 |
Cancel | 取消Try阶段预留的业务资源。 回顾上面航班预定案例的阶段2,如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。 |
------------------------------------
TCC两阶段提交与XA两阶段提交的区别
XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
TCC是业务层面(代码层面)的分布式事务,最终一致性,不会一直持有资源的锁。其核心在于将业务分为两个操作步骤完成。不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务。
订单系统新增一条消息表,将新增订单和新增消息放到一个事务里完成,然后通过轮询的方式去查询消息表,将消息推送到 MQ,库存系统去消费 MQ。
本地消息表这种方案实现了最终一致性,需要在业务系统里增加消息表,业务逻辑中多一次插入的 DB 操作,所以性能会有损耗,而且最终一致性的间隔主要由定时任务的间隔时间决定。
RocketMQ支持事务消息,RabbitMQ 和 Kafka不支持事务消息。
以阿里的 RocketMQ 中间件为例,其思路大致为:
在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
总体而言RocketMQ事务消息分为两条主线
具体流程如下
1、Producer 向 MQ 服务器 发送消息 , MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
2、MQ 服务器收到消息并持久化成功之后,会向Producer 确认首次消息发送成功,此时消息处于 half message(半消息) 状态,并未发送给对应的 Consumer 。
3、Producer 开始执行本地事务逻辑 , 通过本地数据库事务控制。
4、根据事务执行结果,Producer 向 MQ 服务器提交二次确认 ( commit 或者 rollback) 。MQ Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。
5、在断网或者应用重启的情况下,二次确认未成功的发给 MQ Server,MQ Server 会主动向 Producer 启动消息回查
6、Producer 根据事务执行结果,对消息回查返回对应的结果。
7、Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。
注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 目前主流MQ中只有RocketMQ支持事务消息。
Seata全称:Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架。
Seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。
Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
Seata将为用户提供了AT、TCC、SAGA、XA事务模式,为用户打造一站式的分布式解决方案。
Seata中有两种分布式事务实现方案,AT及TCC
Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样
AT模式(Automatic (Branch) Transaction Mode)
其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。
版本声明:
nacos-server-1.4.2 + seata-server-1.4.2 + MySQL5.7 + Hoxton.SR9 + Alibaba2.2.6.RELEASE + SpringBoot2.3.2.RELEASE
1、搭建注册中心和配置中心Nacos
22-07-24 西安 SpringCloud(03) SpringCloud alibaba、Nacos_£小羽毛的博客-CSDN博客
注意,我这里使用的是windows的nacos,本机127.0.0.1上的mysql,不是虚拟机上的mysql/2。
2、在配置中心新增如下配置
2.1新建dataId为seataServer.properties的配置,内容如下:
复制以下内容到该配置文件中,有俩个重点关注的点
- 虚拟组配置,所有的微服务需要加入名称为:my_test_tx_group的事务组中
- 设置TC进行全局事务控制的数据存储方式:store.mode有file,db,redis三种类型。这里选择db,设置mysql连接信息(TC自己也有自己的库)
- transport.type=TCP
- transport.server=NIO
- transport.heartbeat=true
- transport.enableClientBatchSendRequest=true
- transport.threadFactory.bossThreadPrefix=NettyBoss
- transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
- transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
- transport.threadFactory.shareBossWorker=false
- transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
- transport.threadFactory.clientSelectorThreadSize=1
- transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
- transport.threadFactory.bossThreadSize=1
- transport.threadFactory.workerThreadSize=default
- transport.shutdown.wait=3
- service.vgroupMapping.my_test_tx_group=default
- service.default.grouplist=127.0.0.1:8091
- service.enableDegrade=false
- service.disableGlobalTransaction=false
- client.rm.asyncCommitBufferLimit=10000
- client.rm.lock.retryInterval=10
- client.rm.lock.retryTimes=30
- client.rm.lock.retryPolicyBranchRollbackOnConflict=true
- client.rm.reportRetryCount=5
- client.rm.tableMetaCheckEnable=false
- client.rm.tableMetaCheckerInterval=60000
- client.rm.sqlParserType=druid
- client.rm.reportSuccessEnable=false
- client.rm.sagaBranchRegisterEnable=false
- client.rm.tccActionInterceptorOrder=-2147482648
- client.tm.commitRetryCount=5
- client.tm.rollbackRetryCount=5
- client.tm.defaultGlobalTransactionTimeout=60000
- client.tm.degradeCheck=false
- client.tm.degradeCheckAllowTimes=10
- client.tm.degradeCheckPeriod=2000
- client.tm.interceptorOrder=-2147482648
-
- store.mode=db
- store.db.datasource=druid
- store.db.dbType=mysql
- store.db.driverClassName=com.mysql.cj.jdbc.Driver
- store.db.url=jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
- store.db.user=root
- store.db.password=123456
- store.db.minConn=5
- store.db.maxConn=30
- store.db.globalTable=global_table
- store.db.branchTable=branch_table
- store.db.queryLimit=100
- store.db.lockTable=lock_table
-
- store.lock.mode=file
- store.session.mode=file
- store.publicKey=123
- server.recovery.committingRetryPeriod=1000
- server.recovery.asynCommittingRetryPeriod=1000
- server.recovery.rollbackingRetryPeriod=1000
- server.recovery.timeoutRetryPeriod=1000
- server.maxCommitRetryTimeout=-1
- server.maxRollbackRetryTimeout=-1
- server.rollbackRetryTimeoutUnlockEnable=false
- server.distributedLockExpireTime=10000
- client.undo.dataValidation=true
- client.undo.logSerialization=jackson
- client.undo.onlyCareUpdateColumns=true
- server.undo.logSaveDays=7
- server.undo.logDeletePeriod=86400000
- client.undo.logTable=undo_log
- client.undo.compress.enable=true
- client.undo.compress.type=zip
- client.undo.compress.threshold=64k
- log.exceptionRate=100
- transport.serialization=seata
- transport.compressor=none
- metrics.enabled=false
- metrics.registryType=compact
- metrics.exporterList=prometheus
- metrics.exporterPrometheusPort=9898
2.2 顺便在mysql中创建数据库seata,和以下三张表(建表的sql。。。我是直接导入的)
3.新建dataId为common.yml配置,所有微服务共享这个配置(seata不会用)。
内容如下:
大功告成,如下
4.搭建TC服务器[Seata]
4.1.下载地址:https://seata.io/zh-cn/index.html
4.2 解压seata-server-1.4.2.zip安装
4.3 修改\conf\registry.conf设置TC 服务对应的注册中心和配置中心。
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
4.4 启动Seata
\bin目录运行seata-server.bat
项目结构
需求分析
用户下单(TM)的时候记录下单日志,完成订单添加,完成商品库存削减功能,完成用户账户扣款,可以在扣款时制造异常,测试全局分布式事务一致性
1、在fescar-api项目bootstrap.yml文件中引入common.yml配置,共享给其他服务使用。
- spring:
- cloud:
- nacos:
- discovery:
- server-addr: localhost:8848
- group: SEATA_GROUP
- config:
- server-addr: localhost:8848
- file-extension: yml
- group: SEATA_GROUP
- shared-configs[0]:
- data-id: common.yml
- refresh: true
- group: SEATA_GROUP
2、在fescar-api项目中引入依赖,排除低版本依赖,重新引入1.4.2;传递给其他微服务项目使用。
- <dependency>
- <groupId>com.alibaba.cloudgroupId>
- <artifactId>spring-cloud-starter-alibaba-seataartifactId>
- <exclusions>
- <exclusion>
- <groupId>io.seatagroupId>
- <artifactId>seata-spring-boot-starterartifactId>
- exclusion>
- exclusions>
- dependency>
- <dependency>
- <groupId>io.seatagroupId>
- <artifactId>seata-spring-boot-starterartifactId>
- <version>1.4.2version>
- dependency>
3、在 fescar-api 工程下面新建配置类。
在这里配置类中配置的数据源,是为seata库服务的,为了操作undo_log表。undo_log表,每个业务数据库都需要有这张表,用于数据的rollback。
- @Configuration
- public class DataSourceProxyConfig {
- /**
- * 普通数据源
- * @return
- */
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DataSource dataSource() {
- return new DruidDataSource();
- }
- /**
- * 代理数据源绑定DataSourceProxy ---> undo_log的操作
- * @param dataSource
- * @return
- */
- @Bean
- public DataSourceProxy dataSourceProxy(DataSource dataSource) {
- return new DataSourceProxy(dataSource);
- }
- /**
- * mybatis--->手动指定sqlSessionFactory所使用的代理数据源
- * @param dataSourceProxy
- * @return
- * @throws Exception
- */
- @Bean
- public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
- SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
- // 换成代理数据源
- sqlSessionFactoryBean.setDataSource(dataSourceProxy);
- return sqlSessionFactoryBean.getObject();
- }
- }
4、在入口方法上添加@GlobalTransactional
TM:使用tc(seata)控制xid
全局事务:TM控制事务的commit和rollback
- /***
- * ①
- * 下单
- * @GlobalTransactional:全局事务入口
- * @param username
- * @param id
- * @param count
- */
- @GlobalTransactional
- @Override
- public void add(String username, int id, int count) {
- //添加订单日志
- LogInfo logInfo = new LogInfo();
- logInfo.setContent("添加订单数据---"+new Date());
- logInfo.setCreatetime(new Date());
- int logcount = logInfoMapper.insertSelective(logInfo);
- System.out.println("添加日志受影响行数:"+logcount);
-
- //添加订单
- orderInfoFeign.add(username,id,count);
-
- int price = 10;
-
- //用户账户余额递减
- userInfoFeign.decrMoney(username,price * count);
- }
其他微服务的业务方法增加@Transactionial注解【本地事务】;在fescar-user的UserInfoServiceImpl中模拟异常,验证全局事务回滚。
经过测试,当然是好使的呢,从表面来看一个注解就搞定了,太牛了
1、分布式事务的执行流程
2、AT模式下,如何做到对业务的无入侵
---------------------------
一阶段加载
在一阶段,Seata会拦截"业务SQL"
1.解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”(快照)
2.执行"业务SQL"更新业务数据,在业务数据更新之后,保存成“after image”(快照),最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
-------------------------
二阶段提交
二阶段如果顺利提交的话,因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
------------------------------
二阶段回滚
二阶段如果是回滚的话,Seata就需要回滚一阶段执行的“业务SQL”,还原业务数据。
回滚方式便是用“before image”还原业务数据。
但在还原前要首先校验脏写,对比“数据库当前业务数据”和“after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。