• 分布式事务管理Atomikos


    目录

    XA协议

    Atomikos介绍

    atomikos_demo

    Spring对分布式事务的支持

    基于spring实现

    事务执行流程


    XA协议

    • 用于支持单个服务操作多个服务源,XA是由X/Open组织提出的分布式事务的规范。
    • XA规范主要定义了(全局)事务管理器(TM:Transaction Manager)和(局部)资源管理器(RM:Resource Manager)之间的接口。
    • 主流的关系型数据库产品都是实现了XA接口的。
    • XA接口是双向的系统接口,在TM:RM = 1:N之间形成通信桥梁
    • 在分布式系统中,从理论上来说,两台机器无法达成一致的状态,需要引入一个单点进行协调,即为XA引入TM的原因。TM一般使用XA两阶段提交协议与数据库进行交互

    两阶段提交

    • 阶段一为准备阶段prepare,即所有的参与者准备执行事务并锁住需要的资源。当参与者Ready时,向TM 汇报自己已经准备好。以mysql数据库为例,事务管理器向所有涉及到的数据库服务器发出prepare“准备提交”请求,数据库收到请求后执行数据修改和日志记录等处理,处理完成后只是把事务的状态改成“可以提交”,然后把结果返回给事务管理器
    • 阶段二为提交阶段commit,TM根据阶段1各个RM prepare的结果,决定是提交还是回滚事务。如果所有的RM都prepare成功,那么TM通知所有的RM进行提交;如果有RM prepare失败的话,则TM通知所有RM回滚自己的事务分支。以mysql数据库为例,如果第一阶段中所有数据库都prepare成功,那么事务管理器向数据库服务器发出commit请求,数据库服务器把事务的"可以提交"状态改为"提交完成"状态,然后返回应答。如果在第一阶段内有任何一个数据库的操作发生了错误,或者事务管理器收不到某个数据库的回应,则认为事务失败,回撤所有数据库的事务。数据库服务器收不到第二阶段的确认提交请求,也会把"可以提交"的事务回撤。

    图解

    Atomikos介绍

    Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。Tomcat应用服务器没有实现JTA规范,当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,将事务管理整合到应用中,而不依赖于application server。

    atomikos_demo

    Atomikos核心类

    1、数据源定义:com.atomikos.jdbc.AtomikosDataSourceBean

    2、atomikos提供的javax.transaction.UserTransaction接口实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交、回滚事务。

    实现代码

    1. public class AtomikosExample {
    2. private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
    3. // 连接池基本属性
    4. Properties p = new Properties();
    5. p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
    6. p.setProperty("user", "root");
    7. p.setProperty("password", "root");
    8. // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
    9. AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    10. // atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
    11. ds.setUniqueResourceName(dbName);
    12. ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
    13. ds.setXaProperties(p);
    14. return ds;
    15. }
    16. public static void main(String[] args) {
    17. AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("test2022");
    18. AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("test2021");
    19. Connection conn1 = null;
    20. Connection conn2 = null;
    21. PreparedStatement ps1 = null;
    22. PreparedStatement ps2 = null;
    23. UserTransaction userTransaction = new UserTransactionImp();
    24. try {
    25. // 开启事务
    26. userTransaction.begin();
    27. // 执行db1上的sql
    28. conn1 = ds1.getConnection();
    29. ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
    30. ps1.setString(1, "zhangsan");
    31. ps1.executeUpdate();
    32. ResultSet generatedKeys = ps1.getGeneratedKeys();
    33. int userId = -1;
    34. while (generatedKeys.next()) {
    35. // 获得自动生成的userId
    36. userId = generatedKeys.getInt(1);
    37. }
    38. // 模拟异常 ,直接进入catch代码块,2个都不会提交
    39. // int i=1/0;
    40. // 执行db2上的sql
    41. conn2 = ds2.getConnection();
    42. ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
    43. ps2.setInt(1, userId);
    44. ps2.setDouble(2, 10000000);
    45. ps2.executeUpdate();
    46. // 两阶段提交
    47. userTransaction.commit();
    48. } catch (Exception e) {
    49. try {
    50. System.out.println("有异常,所有数据源都回滚");
    51. userTransaction.rollback();
    52. } catch (SystemException e1) {
    53. e1.printStackTrace();
    54. }
    55. } finally {
    56. try {
    57. ps1.close();
    58. ps2.close();
    59. conn1.close();
    60. conn2.close();
    61. ds1.close();
    62. ds2.close();
    63. } catch (Exception ignore) {
    64. }
    65. }
    66. }
    67. }

    注:新建的maven项目,是依赖Java JDK

    需要手动导入javax

    Spring对分布式事务的支持

    使用其声明式事务管理功能来完成事务功能。一般使用的步骤如下:

    1、配置事务管理器。spring提供了PlatformTransactionManager接口,其有2个重要的实现类。

    • DataSourceTransactionManager:用于支持本地事务,事实上,其内部也是通过操作java.sql.Connection来开启、提交和回滚事务。
    • JtaTransactionManager:用于支持分布式事务,其实现了JTA规范,使用XA协议进行两阶段提交。需要注意的是,这只是一个代理,我们需要为其提供一个JTA provider,一般是Java EE容器提供的事务协调器(Java EE server's transaction coordinator),也可以不依赖容器,配置一个本地的JTA provider。JTA规范:Java Transaction API,Java提供的基于XA规范的事务管理规范

    2、在需要开启的事务的bean的方法上添加@Transitional注解

    基于spring实现

    配置文件,去配置分布式事务管理器JtaTransactionManager

    一个事务方法,涉及两个数据源

    1. public class JTAService {
    2. @Autowired
    3. private UserMapper userMapper;
    4. @Autowired
    5. private AccountMapper accountMapper;
    6. @Transactional(rollbackFor = Throwable.class)
    7. public void insert() {
    8. User user = new User();
    9. user.setName("张三");
    10. userMapper.insert(user);
    11. // 模拟异常
    12. // int i = 1 / 0;
    13. Account account = new Account();
    14. account.setUserId(user.getId());
    15. account.setMoney(123456789);
    16. accountMapper.insert(account);
    17. }
    18. }

    执行启动代码

    1. ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");
    2. JTAService jtaService = context.getBean("jtaService", JTAService.class);
    3. jtaService.insert();

    事务执行流程

    注:依赖spring版本4.3.7.RELEASE

    事务注解被事务拦截器拦截后,调用代理对象的invoke方法org.springframework.transaction.interceptor.TransactionInterceptor#invoke

    核心框架执行org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction

    开启一个JTA事务:com.atomikos.icatch.jta.TransactionManagerImp#begin(int)

     事务初始状态

     底层管理

     

  • 相关阅读:
    【分布式】入门级NCCL多机并行实践 - 02
    前端项目使用钉钉字体
    springboot+mybatis拦截器实现水平分表操作
    将时间序列转成图像——递归图方法 Matlab实现
    案例研究丨神策数据在多项目、多网络场景下使用JumpServer堡垒机
    这 10 种架构师,不合格!
    Linux——进程概念(上)
    搜索多种格式文件工具
    clickhouse在执行alter table update delete等命令后数据没有更新
    FastGPT 手动部署错误:MongooseServerSelectionError: getaddrinfo EAI_AGAIN mongo
  • 原文地址:https://blog.csdn.net/tec_1535/article/details/126867302