一、概述
二阶段消息是DTM
新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。
相比现有的消息架构借助于各种消息中间件比如RocketMQ
等,DTM
自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景。
二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:
二阶段消息主要是指Prepare
和Submit
两个阶段,主程序向DTM
服务发送Prepare
消息,成功后执行本地事务,完成本地事务后发送Submit
消息至DTM
服务,之后DTM
会调用分支事件执行其他服务,最后完成全局事务。
当发送了Prepare
但是Submit
没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:
1、在处理本地事务时,会将gid
插入到barrier
表中,同时带上插入原因为committed
。该表有一个唯一索引,主要字段为gid
。
2、当进行回查时,二阶段消息的操作不是直接查gid
是否存在,而是再insert ignore
一条带有相同gid
的数据,同时带上插入原因为rollbacked
。此时如果表中如果已有gid
的记录,那么新的插入操作就会被ignore
,否则数据会被插入。
3、然后再用gid
查询表中的记录,如果查到记录的reason
为committed
,那么说明本地事务已提交;如果查到记录的reason
为rollbacked
,那么说明本地事务已回滚。
二、安装DTM
我使用二进制包下载安装地址,我是Window
环境所以下载后解压,点击dtm.exe
进行运行即可,如下启动成功
启动成功后可以访问http://localhost:36789
,进入管理后台
三、创建DTM所需的表
我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:
CREATE TABLE [dbo].[barrier] ( [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY, [trans_type] varchar(45) NOT NULL DEFAULT(''), [gid] varchar(128) NOT NULL DEFAULT(''), [branch_id] varchar(128) NOT NULL DEFAULT(''), [op] varchar(45) NOT NULL DEFAULT(''), [barrier_id] varchar(45) NOT NULL DEFAULT(''), [reason] varchar(45) NOT NULL DEFAULT(''), [create_time] datetime NOT NULL DEFAULT(getdate()) , [update_time] datetime NOT NULL DEFAULT(getdate()) ) GO CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier] ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC) WITH(IGNORE_DUP_KEY = ON) GO
这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON
,这个其实就是为了等价mysql
的insert ignore
表示存在相关字段的信息则不插入,否则就插入数据
当然还支持很多其他的数据库,建表语句可以从这里查看地址
四、创建项目
我们简单的创建两个.net core webapi
项目进行测试,两个项目都进行相同的如下操作:
1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer
安装Dtmcli
是因为其中已经帮我们集成了DTM
客户端SDK HTTP
版本,想要GRPC
版本可以安装Dtmgrpc
。
安装Microsoft.EntityFrameworkCore.SqlServer
很显然是为了处理数据库。
Install-Package Dtmcli Install-Package Microsoft.EntityFrameworkCore.SqlServer
2、配置
接下来我们配置服务,先在配置文件appsetting.json
中添加如下
"AppSettings": { "DtmUrl": "http://localhost:36789", "BusiUrl": "http://localhost:5056", "QueryPreparedUrl": "http://localhost:5046", "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True" }
DtmUrl
:DTM
的监听地址,http
的是36789
,grpc
的是36790
BusiUrl
:访问其他服务的地址
QueryPreparedUrl
:回查的地址
BarrierConn
:数据库连接语句
添加一个配置类:
public class AppSettings { public string DtmUrl { get; set; } public string BusiUrl { get; set; } public string BarrierConn { get; set; } public string QueryPreparedUrl { get; set; } }
之后注入服务如下:
builder.Services.AddDtmcli(dtm => { dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl"); dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER; dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]"; }); builder.Services.Configure(builder.Configuration.GetSection("AppSettings"));
SqlDbType
:表示使用的数据库类型
BarrierSqlTableName
:Barrier
表的名字
3、添加代码
我们在其中一个项目添加主程序代码如下:
[ApiController]public class DtmController : ControllerBase { private readonly ILogger_logger; private readonly IDtmClient _dtmClient; private readonly IDtmTransFactory _transFactory; private readonly AppSettings _settings; private readonly IBranchBarrierFactory _factory; public DtmController(ILogger logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions settings, IBranchBarrierFactory factory) { _logger = logger; _dtmClient = dtmClient; _transFactory = transFactory; _settings = settings.Value; _factory = factory; } private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn); [HttpPost("post-dtm-msg")] public async Task Get(CancellationToken cancellationToken) { //1、创建gid var gid = await _dtmClient.GenGid(cancellationToken); //2、设置分支事务 var msg = _transFactory.NewMsg(gid) .Add(_settings.BusiUrl + "/TransOut", new { id = 123 }) .Add(_settings.BusiUrl + "/TransIn", new { id = 321 });//3、执行submit using (DbConnection conn = GetConn()) { await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx => { //4、执行本地事务 await Task.CompletedTask; }); } _logger.LogInformation("result gid is {0}", gid); return Content("SUCCESS"); } [HttpGet("msg-queryprepared")] public async Task QueryPrepared(CancellationToken cancellationToken) { var bb = _factory.CreateBranchBarrier(Request.Query); _logger.LogInformation("bb {0}", bb); using (DbConnection conn = GetConn()) { //回调查询消息状态 var res = await bb.QueryPrepared(conn); return Ok(new { dtm_result = res }); } } }
然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:
[ApiController] public class TransController : ControllerBase { private readonly ILogger_logger; private readonly IBranchBarrierFactory _factory; private readonly AppSettings _settings; private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn); public TransController(ILogger logger, IBranchBarrierFactory factory, IOptions settings) { _logger = logger; _factory = factory; _settings = settings.Value; } [HttpPost("TransIn")] public async Task In() { return Results.Ok(new { dtm_result = "SUCCESS" }); //return Results.Ok(new { dtm_result = "FAILURE" }); } [HttpPost("TransOut")] public async Task Out() { return Results.Ok(new { dtm_result = "SUCCESS" }); } }
五、执行查看结果
我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录
可以在管理后台看到我们请求成功的信息
如果要演示失败,需要做以下修改直接报错,我们可以看到访问了回调方法,然后数据库中看到rollback
标记的消息
using (DbConnection conn = GetConn()) { await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx => { throw new Exception("报错了"); //4、执行本地事务 await Task.CompletedTask; }); }
提交后再宕机演示比较麻烦,我就不演示了,大家意会即可。
如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。
如果您经过长时间的的宕机,因指数退避算法导致要很久才会重试。如果您想要手动触发立即重试,您可以手动把相应事务的next_cron_time(Redis存储引擎的该功能还在开发中)修改为当前时间,就会在数秒内被定时轮询,事务就会继续往前执行。