• .NetCore中使用分布式事务DTM的二阶段消息


    一、概述

    二阶段消息是DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

    相比现有的消息架构借助于各种消息中间件比如RocketMQ等,DTM自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

    二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:

     

     二阶段消息主要是指PrepareSubmit两个阶段,主程序向DTM服务发送Prepare消息,成功后执行本地事务,完成本地事务后发送Submit消息至DTM服务,之后DTM会调用分支事件执行其他服务,最后完成全局事务。

     当发送了Prepare但是Submit没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

     1、在处理本地事务时,会将gid插入到barrier表中,同时带上插入原因为committed。该表有一个唯一索引,主要字段为gid

     2、当进行回查时,二阶段消息的操作不是直接查gid是否存在,而是再insert ignore一条带有相同gid的数据,同时带上插入原因为rollbacked。此时如果表中如果已有gid的记录,那么新的插入操作就会被ignore,否则数据会被插入。

     3、然后再用gid查询表中的记录,如果查到记录的reasoncommitted,那么说明本地事务已提交;如果查到记录的reasonrollbacked,那么说明本地事务已回滚。

    二、安装DTM

     我使用二进制包下载安装地址,我是Window环境所以下载后解压,点击dtm.exe进行运行即可,如下启动成功

    启动成功后可以访问http://localhost:36789,进入管理后台

    三、创建DTM所需的表

    我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:

    复制代码
    CREATE TABLE [dbo].[barrier]
    (
        [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
        [trans_type] varchar(45) NOT NULL DEFAULT(''),
        [gid] varchar(128) NOT NULL DEFAULT(''),
        [branch_id] varchar(128) NOT NULL DEFAULT(''),
        [op] varchar(45) NOT NULL DEFAULT(''),
        [barrier_id] varchar(45) NOT NULL DEFAULT(''),
        [reason] varchar(45) NOT NULL DEFAULT(''),
        [create_time] datetime NOT NULL DEFAULT(getdate()) ,
        [update_time] datetime NOT NULL DEFAULT(getdate())
    )
    
    GO
    
    CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
            ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
    WITH(IGNORE_DUP_KEY = ON)
    
    GO
    复制代码

    这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON,这个其实就是为了等价mysqlinsert ignore表示存在相关字段的信息则不插入,否则就插入数据

    当然还支持很多其他的数据库,建表语句可以从这里查看地址

     四、创建项目

     我们简单的创建两个.net core webapi项目进行测试,两个项目都进行相同的如下操作:

     1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer

     安装Dtmcli是因为其中已经帮我们集成了DTM客户端SDK HTTP版本,想要GRPC版本可以安装Dtmgrpc

     安装Microsoft.EntityFrameworkCore.SqlServer很显然是为了处理数据库。

    Install-Package Dtmcli
    Install-Package Microsoft.EntityFrameworkCore.SqlServer

    2、配置

    接下来我们配置服务,先在配置文件appsetting.json中添加如下

    复制代码
      "AppSettings": {
        "DtmUrl": "http://localhost:36789",
        "BusiUrl": "http://localhost:5056",
        "QueryPreparedUrl": "http://localhost:5046",
        "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True"
      }
    复制代码

     DtmUrlDTM的监听地址,http的是36789grpc的是36790

     BusiUrl:访问其他服务的地址

     QueryPreparedUrl:回查的地址

     BarrierConn:数据库连接语句

     添加一个配置类:

    复制代码
        public class AppSettings
        {
            public string DtmUrl { get; set; }
            public string BusiUrl { get; set; }
            public string BarrierConn { get; set; }
            public string QueryPreparedUrl { get; set; }
        }
    复制代码

     之后注入服务如下:

    复制代码
    builder.Services.AddDtmcli(dtm => { 
        dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl");
        dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER;
        dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]";
    });
    builder.Services.Configure(builder.Configuration.GetSection("AppSettings"));
    复制代码

    SqlDbType:表示使用的数据库类型

    BarrierSqlTableNameBarrier表的名字

    3、添加代码

    我们在其中一个项目添加主程序代码如下:

    复制代码
        [ApiController]public class DtmController : ControllerBase
        {
    
            private readonly ILogger _logger;
            private readonly IDtmClient _dtmClient;
            private readonly IDtmTransFactory _transFactory;
            private readonly AppSettings _settings;
            private readonly IBranchBarrierFactory _factory;
            public DtmController(ILogger logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions settings, IBranchBarrierFactory factory)
            {
                _logger = logger;
                _dtmClient = dtmClient;
                _transFactory = transFactory;
                _settings = settings.Value;
                _factory = factory;
            }
            private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
            [HttpPost("post-dtm-msg")]
            public async Task Get(CancellationToken cancellationToken)
            {
                //1、创建gid
                var gid = await _dtmClient.GenGid(cancellationToken);
                //2、设置分支事务
                var msg = _transFactory.NewMsg(gid)
                    .Add(_settings.BusiUrl + "/TransOut", new { id = 123 })
                    .Add(_settings.BusiUrl + "/TransIn", new { id = 321 });//3、执行submit
                using (DbConnection conn = GetConn())
                {
                    await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                    {
                        //4、执行本地事务
                        await Task.CompletedTask;
                    });
                }
                _logger.LogInformation("result gid is {0}", gid);
                return Content("SUCCESS");
            }
            [HttpGet("msg-queryprepared")]
            public async Task QueryPrepared(CancellationToken cancellationToken)
            {
                var bb = _factory.CreateBranchBarrier(Request.Query);
                _logger.LogInformation("bb {0}", bb);
                using (DbConnection conn = GetConn())
                {
                    //回调查询消息状态
                    var res = await bb.QueryPrepared(conn);
                    return Ok(new { dtm_result = res });
                }
            }
        }
    复制代码

    然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:

    复制代码
    [ApiController]
        public class TransController : ControllerBase
        {
            private readonly ILogger _logger;
            private readonly IBranchBarrierFactory _factory;
            private readonly AppSettings _settings;
            private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
            public TransController(ILogger logger, IBranchBarrierFactory factory, IOptions settings)
            {
                _logger = logger;
                _factory = factory;
                _settings = settings.Value;
            }
            [HttpPost("TransIn")]
            public async Task In()
            {
                return Results.Ok(new { dtm_result = "SUCCESS" });
                //return Results.Ok(new { dtm_result = "FAILURE" });
            }
            [HttpPost("TransOut")]
            public async Task Out()
            {
                return Results.Ok(new { dtm_result = "SUCCESS" });
            }
        }
    复制代码

    五、执行查看结果

    我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录

    可以在管理后台看到我们请求成功的信息

     如果要演示失败,需要做以下修改直接报错,我们可以看到访问了回调方法,然后数据库中看到rollback标记的消息

    复制代码
    using (DbConnection conn = GetConn())
                {
                    await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                    {
                        throw new Exception("报错了");
                        //4、执行本地事务
                        await Task.CompletedTask;
                    });
                }
    复制代码

     提交后再宕机演示比较麻烦,我就不演示了,大家意会即可。

     如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。

     如果您经过长时间的的宕机,因指数退避算法导致要很久才会重试。如果您想要手动触发立即重试,您可以手动把相应事务的next_cron_time(Redis存储引擎的该功能还在开发中)修改为当前时间,就会在数秒内被定时轮询,事务就会继续往前执行。

     

  • 相关阅读:
    HT for Web (Hightopo) 使用心得(1)- 基本概念
    2024采用JSP的酒店客房管理系统源代码+毕业设计论文+开题报告+答辩PPT
    ubuntu22.04设置中文
    API测试简介
    Nginx实战:LUA脚本_环境配置安装
    Java 中的数据类型有哪些?
    SVN安装教程
    uni-app开发常用操作速查记录
    Rollup:zkSync v2.0和ZK-Rollup的未来
    Redis中的渐进式遍历-Scan命令
  • 原文地址:https://www.cnblogs.com/xwc1996/p/17252311.html