文章涵盖2PC,3PC刚性事务;TCC,本地消息表,可靠性消息,最大努力通知。本文将对其一一讲解,尽最大努力写的通俗易懂。
一、什么是分布式事务
在之前的单体应用环境下,所有的业务操作都是跟一个数据库打交道,直接采用数据库事务即可,而在如今分布式场景下,一个业务功能可能需要横跨多个服务,操作多个数据库,这个时候,我们本地事务就无法保证每个服务对数据库的操作是同时成功或者同时失败,这个时候就需要我们了解分布式事务。
二、分布式事务理论解决方案
1、2PC
2PC又叫两阶段提交,它其实是一种协议,是基于XA规范搞的一套分布式事务的理论。
X/Open组织定义了一套DTP分布式模型,主要含有:
AP(应用程序)
TM(事务管理器)
RM(资源管理器) ---- 通常指数据库
CRM(通讯资源管理器)四部分 ---- 消息中间件
XA则定义TM和RM之前通讯的接口规范,XA接口函数由各个数据库厂商提供。一般常见的事务管理器( TM )是交易中间件,例如JDBC或者hibernate提供的transactionmanager,常见的资源管理器( RM )是数据库,通常就是数据源,例如JDBC或第三方提供的datasource,常见的通信资源管理器( CRM )是消息中间件,如JMS。
举一个生活中的例子:
阶段 1 :
( 1 ) A 发邮件给 B 、 C 和 D ,提出下周三去爬山, 问是否同意 。那么此时 A 需要 等待 B 、 C 和 D 的邮件。
( 2 ) B 、 C 和 D 分别查看自己的日程安排表。 B 、 C 发现自己在当日没有活动安排,则发邮件告诉 A 它们同意下周三去爬长城。由于某种原因, D 白天没有查看邮 件。那么此时 A 、 B 和 C 均需要等待。到晚上的时候, D 发现了 A 的邮件,然后查看日程安排,发现周三当天已经有别的安排,那么 D 回复 A 说活动取消吧。
阶段 2 :
( 1 ) A 收到了所有活动参与者 的邮件,并且 A 发现 D 下周三不能去爬山。那么 A 将发邮件通知 B 、 C 和 D ,下周三爬长城活动取消。
( 2 ) B 、 C 回复 A “太可惜了”, D 回复 A “不好意思” , 至此该事务终止 。
我们可以发现A即为我们的TM(事务管理器)决策者,B,C,D分别是我们的AP(应用程序),AP他们自己连着RM(数据库),而TM用于通知RM准备,提交,或者回滚。
事务管理器先问问各个数据库你准备好了吗?如果每个数据库都回复ok,那么就正式提交事务,在各个数据库上执行操作;如果任何一个数据库回答不ok,那么就回滚事务。
代码实现一个插入数据的过程:
这里的指令都是数据库对于XA协议的实现,只需要遵守就好了。
xa.start 告诉数据库我要开始了
xa.end 告诉数据库我的sql已经执行完了,但是没提交、
xa.prepare 询问数据库你们只能把好了吗
xa.commit 提交
xa.rollback 回滚
xa.recover(查询事务状态的指令是否在prepare状态)
- public static void main(String[] args) throws SQLException {
- //true表示打印XA语句,,用于调试
- boolean logXaCommands = true;
- // 获得资源管理器操作接口实例 RM1
- Connection conn1 = DriverManager.getConnection
- ("jdbc:mysql://localhost:3306/db_user", "root", "root");
- XAConnection xaConn1 = new MysqlXAConnection(
- (com.mysql.jdbc.Connection) conn1, logXaCommands);
- XAResource rm1 = xaConn1.getXAResource();
- // 获得资源管理器操作接口实例 RM2
- Connection conn2 = DriverManager.getConnection
- ("jdbc:mysql://localhost:3306/db_account", "root", "root");
- XAConnection xaConn2 = new MysqlXAConnection(
- (com.mysql.jdbc.Connection) conn2, logXaCommands);
- XAResource rm2 = xaConn2.getXAResource();
- // AP请求TM执行一个分布式事务,TM生成全局事务id
- byte[] gtrid = "g12345".getBytes();
- int formatId = 1;
- try {
- // ==============分别执行RM1和RM2上的事务分支====================
- // TM生成rm1上的事务分支id
- byte[] bqual1 = "b00001".getBytes();
- Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
- // 执行rm1上的事务分支
- rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
- PreparedStatement ps1 = conn1.prepareStatement(
- "INSERT into user(name) VALUES ('test')");
- ps1.execute();
- rm1.end(xid1, XAResource.TMSUCCESS);
- // TM生成rm2上的事务分支id
- byte[] bqual2 = "b00002".getBytes();
- Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
- // 执行rm2上的事务分支
- rm2.start(xid2, XAResource.TMNOFLAGS);
- PreparedStatement ps2 = conn2.prepareStatement(
- "INSERT into account(user_id,money) VALUES (1,10000000)");
- ps2.execute();
- rm2.end(xid2, XAResource.TMSUCCESS);
-
- // ===================两阶段提交================================
- // phase1:询问所有的RM 准备提交事务分支
- int rm1_prepare = rm1.prepare(xid1);
- int rm2_prepare = rm2.prepare(xid2);
-
- // phase2:提交所有事务分支
- boolean onePhase = false;
- //TM判断有2个事务分支,所以不能优化为一阶段提交
- if (rm1_prepare == XAResource.XA_OK
- && rm2_prepare == XAResource.XA_OK) {
- //所有事务分支都prepare成功,提交所有事务分支
- rm1.commit(xid1, onePhase);
- rm2.commit(xid2, onePhase);
- } else {
- //如果有事务分支没有成功,则回滚
- rm1.rollback(xid1);
- rm2.rollback(xid2);
- }
- } catch (XAException e) {
- // 如果出现异常,也要进行回滚
- e.printStackTrace();
- }
- }
这里的指令都是数据库对于XA协议的实现,只需要遵守就好了。
1.1 两阶段提交原理(Seata的AT模式也是这一种模型)
(1)准备阶段(Prepare phase):事务管理器给每个参与者发送prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo,此时事务没有提交。
(Undo日志是记录修改前的数据,用户数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据。)
(2)提交阶段(Commit phase):如果事务管理器接收了参与者执行失败或者超时消息时,直接给每个参与者发送回滚消息,否则发送提交消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。
1.2 两阶段提交的缺陷
(1)锁资源(效率低):二阶段提交协议的第一阶段准备阶段不仅仅是回答YES or NO,还是要执行事务操作的,只是执行完事务操作,并没有进行commit或者rollback。也就是说,一旦事务执行之后,在没有执行commit或者rollback之前,资源是被锁定的。这会造成阻塞,如果sql是行锁则锁行,表锁则锁表。
(2)局限性:如果数据库没有自己对XA的实现你是无法使用的,什么redis那些都不能使用。
(3)单点故障:由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。
(4)数据不一致:在阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这会导致只有一部分参与者接受到了commit请求,而在这部分参与者接到commit请求之后就会执行commit操作,但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。
1.3 两阶段提交协议事务悬挂与解决方案
如果我们执行过程中的时候,宕机了怎么办,或者commit,rollback的时候网络故障了,整个事务流程没有走完,相信很多同学都会有这个想法,这个时候就会锁资源,事务还悬挂在哪,咋办呢?
解决方案:全局事务id和分支事务id这个时候的作用就来了,我们一定要记录下每一个分支事务走到哪一个流程的日志,事务状态是怎么样的,是否已经完成,时间是什么时候。然后定时任务补偿,调用上面提到的XA.RECOVER,他可以查询到所有正在Prepare(悬挂)事务的全局id和分支事务id,根据这些id信息,查询日志悬挂了几分钟的事务,然后根据所有分支事务的状态,我们就能决定执行提交还是回滚。例如一些开源分布式事务框架Atomikos,hmily,已经帮我们实现。市面上还有一些2PC的实现框架,例如Atomikos,其实底层都是通过动态代理数据源,拦截sql执行,然后执行xa.start等方法。并且他们很好的帮我们去解决两阶段提交事务悬挂等问题,不需要自己实现。
当然,如果最终还是不能提交事务,得人工去处理。
1.4 2pc两阶段提交总结
两阶段,我们很少用,一般来说某个系统内部如果出现跨多个库的这么一个操作,是不合规的,而且性能不太行。现在微服务,一个大的系统分成几十个服务。一般来说,我们的规定和规范,是要求说每个服务只能操作自己对应的一个数据库。 如果你要操作别的服务对应的库,不允许直连别的服务的库,违反微服务架构的规范,你随便交叉胡乱访问,这样的一套服务是没法管理的,没法治理的,经常数据被别人改错,自己的库被别人写挂。 如果你要操作别人的服务的库,你必须是通过调用别的服务的接口来实现,绝对不允许你交叉访问别人的数据库。
2、3pc
2.1 3pc过程
它分为三步:
在DoCommit阶段,如果参与者无法及时接收到来自协调者的DoCommit或者abort请求时,会在等待超时之后,会继续进行事务的提交,因为当进入第三阶段时,说明所有参与者在第二阶段已经收到了PreCommit请求,并且反馈都为Yes,所以当进入第三阶段,而参与者又没收到反馈,一般都是网络问题导致的,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大
2.2 3pc优缺点
相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit,而不会一直持有事务资源并处于阻塞状态。
但同时也会造成数据一致性问题,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。
2.3 3pc相比2pc多了什么
2.4 3pc总结
2pc和3pc都是属于刚性事务,性能都会有影响,因为prapare阶段会锁资源,并且我们发现,这种模式大多数使用于单个项目多数据源,并不适合我们分布式的环境远程RPC调用。2PC只有TM的超时机制,3PC新增了参与者(RM)的超时机制,3PC多了CanCommit阶段,这就是最大的区别。
3、Tcc分布式事务
TCC是Try,Confirm,Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try,确认阶段Confirm,撤销Cancel。
Try操作做业务检查和资源预留,confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作,其实就是一个补偿。
TCC的实现方式其实是在业务层,也就是代码层,对每一个业务操作,都注册一个补偿操作,例如下单操作,那么下单失败过后该干嘛?就该调用下单失败的补偿接口。这对代码的侵入是很大的,并且如果下游业务没有按照你的要求实现相应的补偿接口,Tcc也就不存在的。
例子:现在我们来模拟一个下单的业务,来看看Tcc如何实现的。
目前有三个服务,订单服务,库存服务,余额服务
(1)新增订单A,状态为带支付
(2)库存服务修改对应商品(原库存100),生成一条冻结记录,冻结库存为1
(3)用户服务修改用户余额(原余额100),生成一条冻结记录,冻结余额为1
Confirm阶段
Try成功之后
(1)库存服务扣减库存(现库存99),解冻库存,冻结库存为0
(2)用户服务扣减金额(现金额99),解冻余额,冻结余额为0
(3)将订单A,状态改为待支付
因为Tyr阶段预留了资源是成功的,Confirm阶段默认都会成功的
Cancel阶段
Tyr阶段任意一个操作失败,执行对应补偿接口
(1)库存服务补回库存(现库存100),冻结库存为0
(2)用户服务补回余额(现余额100),冻结余额为0
(3)修改订单A,状态改为取消
3.1 Tcc事务所产生的问题
(1)问题一:幂等性
由于网络问题,会有一个retry的重试操作,上面任意一步都可能调用多次,假如你进行重试,如果没保证幂等性就会产生数据的错误,所以我们必须要保证幂等性
解决方案:其实tcc事务执行,会有一个贯穿整个全局事务的全局事务id并且每一个分支事务会有一个分支事务id,我们每个微服务本地需要有一张分支本地事务日志表,里面有的字段,全局事务id,分支事务id,分支事务执行状态(1.try执行成功,2.confirm执行成功,3cancel执行成功),这样我们在重试的时候,首先用分支事务id来作为锁的key,然后去查询本地事务表,我是否执行过这一步操作,如果执行过则不执行,这样就可以保证幂等性。所以我们业务操作每一步操作的时候都需要在本地事务表记录当前分支事务的状态,和业务代码一起提交事务,这样可以回溯分支事务是否完成。
(2)问题二:空回滚
以上面为例:假设库存服务执行try成功了,网络问题,Confirm失败了,这个时候我们会去调用订3个服务的Cancel补偿接口,然而2,3,4的try方法都还没执行,证明都还没开始预留资源,你就把我回滚了,我try还没扣余额,你cancel反倒还给我加余额,这就是空回滚问题。
解决方案:全局事务id,分支事务id,分支本地事务日志表,在我们执行cancel操作之前,加锁,然后去本地事务表找一下当前有没有执行过try操作,有就执行,没有就不执行,并且记录本地事务日志表状态。
(3)问题三:事务悬挂
还是网络原因产生的问题和上面类似,假设我们设置了请求的超时时间为3秒,当我们对服务B执行try操作的时候,产生了超时,这个时候我们会调用B对应的cancel接口,但是我们的try还没执行,就执行了cancel,这个时候就有可能产生脏数据。然后这个时候服务B又执行了未执行完的try操作,那就完蛋了,我这个try操作永远都悬挂在这里了,芭比Q,因为我已经执行了cancel。
解决方案:全局事务id,分支事务id,分支本地事务日志表,在我们执行try操作之前,加锁,然后去本地事务表找一下当前有没有执行过cancel操作,有就不执行。
(4)问题四:cancel或者confirm失败
假设我所有try都成功了,或者有一些失败了,需要cancel和confirm的时候因为网络问题失败了,怎么办。
解决方案:定时任务+本地事务消息表。本地会有个定时任务,定时去本地事务表日志扫描还未完成的事务,假设这个事务所有try都成功,有一部分confirm失败了,定时任务会不断去帮你执行confirm操作,反之cancel。
3.2 Tcc事务总结
优点:并发高,不锁资源(2PC会锁数据库资源),本地事务提交即可,一般适用于资金等不能出错的场景。
缺点:复杂,需要一个业务接口需要拆成3个,业务侵入性非常之大,老系统改得你怀疑人生,如果自己实现还得记录本地事务日志,重试,一般团队没实力不会使用这个,因为太骚了。
4、本地消息表
(1)A系统在自己本地一个事务里操作同时,插入一条数据到消息表(同一个数据库原子性)
(2)接着A系统将这个消息发送到MQ中去
(3)B系统接收到消息之后,在一个事务里,往自己本地消息表里(幂等性)插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息
(4)B系统执行成功之后,就会更新自己本地消息表的状态以及A系统消息表的状态,如果B系统处理失败了,那么就不会更新消息表状态,那么此时A系统会定时扫描自己的消息表,如果有没处理的消息,会再次发送到MQ中去,让B再次处理
(5)这个方案保证了最终一致性,哪怕B事务失败了,但是A会不断重发消息,直到B那边成功为止,达到最终一致
(6)我们发送出去的消息,必须得带上业务相关联的数据,例如新增用户,发送增加积分mq消息,我们的mq消息的消息体得带上新增这个用户的id,以便以后回溯查找问题。
(6)简单理解:消息本地化,定时任务定时发送消息中间件。
4.1 本地消息表举例:
用户注册赠送积分为例
- //用户服务
- @Component
- public class UserClient {
- @Autowired
- private UserMapper userMapper;
- @Autowired
- private PointLogMapper pointLogMapper;
- //本地事务,可以优化本地事务提交之后立刻发送mq,实时性会好一点
- @Transactional
- public ApiResult save(User user){
- //保存用户
- userMapper.save(user);
- PointLog pointLog=new PointLog();
- pointLog.setUserId(user.getid);
- pointLog.setPoint(1);
- pointLog.setcreateTimeUtc(TimeUtils.getUTCTime());
- pointLog.setstate(0);
- //保存积分日志记录
- pointLogMapper.savePointLog(pointLog);
- }
-
- //本地消息表找未发送mq的消息,发送mq
- @Scheduled(cron = "0 1 0 * * ?")
- public pushMessage(){
- List
pointLogList=pointLogMapper.findAllLogUnPush(); - for (PointLog pointLog:
- pointLogList) {
- boolean success=sendMq(JSON.toJsonString(pointLog));
- if(success){
- pointLog.setstate(1);
- pointLogMapper.update(pointLog);
- }
- }
- }
- }
-
- //积分服务
- @Component
- public class PointClient {
- @Autowired
- private PonitMessageMapper pointMessageMapper;
-
- @Override
- @Transactional
- public void onMessage(String message){
- PointMessage pointMessage=JSONObject.parseObject(jsonObject.toJSONString(), new TypeReference
(){}); - //插入消息表,唯一约束
- pointMessageMapper.insert(pointMessage);
- //处理逻辑
- pointServeice.addPoint(pointMessage);
-
- message.ack();
- }
- }
4.2 本地消息表总结
优点:简单,开发成本低,简单
缺点:与业务场景绑定,高耦合,不可公用,本地消息表与业务数据表在同一个库,占用业务系统资源,量大可能会影响数据库性能
5、可靠性消息
这个方案其实就是我们本地消息表的一个变种,实现原理基本也一样。就是需要一个可靠消息服务,来帮我们把本地消息表还有定时任务的功能给做了,例如RocketMq的Half Message(半消息)
事务开始我们只需要投递一个半消息(消费端不可消费的消息),可靠消息服务把消息持久化(可以看做本地消息表),然后告诉可靠消息服务,我这次业务本地事务是提交还是回滚,如果是提交,我就让这个半消息的状态改变,变为可消费,并且消费端如果消费失败我们可靠消息服务的定时任务会不断投递可消费的消息给消费端,消费端ack。假如可靠消息服务迟迟收不到我是应该把这条半消息提交或者回滚,他会自动去调用发送端的查询接口,让他告诉我应该把这个半消息状态改为可消费,还是应该删除。
消费者消费失败的时候,其实分为两种失败情况:
1、非业务异常导致的失败,比如说网络超时等,这种会触发mq的重试机制,保证消费成功
2、还有一种是业务异常,比如下单后给账户服务发送扣款消息,余额不够导致扣款失败,这种情况其实也应该回复消费成功的ACK,但是消费方发生一条扣款失败的消息给业务方,通知回滚
5、最大努力通知
我们平常在接入微信支付的时候,只要用户支付成功了,微信会已一定的频率回调我们,假如我们回复微信失败了,他会过一段时间再通知,最多通知15次。15次之后,我们就只能主动去微信那边查询这笔订单是否成功。微信的这个服务就是最大努力通知
5.1 最大努力通知和可靠性消息的异同
(1)解决方案思想不同
(2)两者的业务应用场景不同
(3)技术解决方向不同
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)
参考文章:
分布式事务解决方案和代码落地_Dr.劳的博客-CSDN博客_分布式事务代码