

目录
可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ

RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。
实现原理
RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。



业务介绍
通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。


流程

架构选型

orders订单数据表
orders数据表存储于tx-msg-orders订单数据库。

- DROP TABLE IF EXISTS `orders`;
- CREATE TABLE `order` (
- `id` bigint(20) NOT NULL COMMENT '主键',
- `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
- `order_no` varchar(64) CHARACTER SET utf8
- COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单
- 编号',
- `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
- `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',
- PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
- = utf8_bin ROW_FORMAT = Dynamic;
- SET FOREIGN_KEY_CHECKS = 1;
- CREATE TABLE `tx_log` (
- `tx_no` varchar(64) CHARACTER SET utf8
- COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
- `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
- PRIMARY KEY (`tx_no`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
stock库存数据表

- DROP TABLE IF EXISTS `stock`;
- CREATE TABLE `stock` (
- `id` bigint(20) NOT NULL COMMENT '主键id',
- `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
- `total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存',
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
- -- ----------------------------
- -- Table structure for tx_log
- -- ----------------------------
- DROP TABLE IF EXISTS `tx_log`;
- CREATE TABLE `tx_log` (
- `tx_no` varchar(64) CHARACTER SET utf8
- COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
- `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
- PRIMARY KEY (`tx_no`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
- SET FOREIGN_KEY_CHECKS = 1;
tx_log事务记录表


在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。


安装NameServer
拉取镜像
docker pull rocketmqinc/rocketmq
创建数据存储目录
- mkdir -p /docker/rocketmq/data/namesrv/logs
- /docker/rocketmq/data/namesrv/store
启动NameServer
- docker run -d \
- --restart=always \
- --name rmqnamesrv \
- -p 9876:9876 \
- -v /docker/rocketmq/data/namesrv/logs:/root/logs \
- -v /docker/rocketmq/data/namesrv/store:/root/store \
- -e "MAX_POSSIBLE_HEAP=100000000" \
- rocketmqinc/rocketmq \
- sh mqnamesrv

安装Broker
border配置:创建 broker.conf 配置文件
vim /docker/rocketmq/conf/broker.conf
- # 所属集群名称,如果节点较多可以配置多个
- brokerClusterName = DefaultCluster
- #broker名称,master和slave使用相同的名称,表明他们的
- 主从关系
- brokerName = broker-a
- #0表示Master,大于0表示不同的
- slave brokerId = 0
- #表示几点做消息删除动作,默认是凌晨4点
- deleteWhen = 04
- #在磁盘上保留消息的时长,单位是小时
- fileReservedTime = 48
- #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和
- 异步表示Master和Slave之间同步数据的机 制;
- brokerRole = ASYNC_MASTER
- #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷
- 盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状
- 态,ASYNC_FLUSH不需要;
- flushDiskType = ASYNC_FLUSH
- # 设置broker节点所在服务器的ip地址
- brokerIP1 = 192.168.66.100
- #剩余磁盘比例
- diskMaxUsedSpaceRatio=99
启动broker
- docker run -d --restart=always --name rmqbroker
- --link rmqnamesrv:namesrv -p 10911:10911 -p
- 10909:10909 --privileged=true -v
- /docker/rocketmq/data/broker/logs:/root/logs -v
- /docker/rocketmq/data/broker/store:/root/store
- -v
- /docker/rocketmq/conf/broker.conf:/opt/rocketmq
- -4.4.0/conf/broker.conf -e
- "NAMESRV_ADDR=namesrv:9876" -e
- "MAX_POSSIBLE_HEAP=200000000"
- rocketmqinc/rocketmq sh mqbroker -c
- /opt/rocketmq-4.4.0/conf/broker.conf

报错:


RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装
- #创建并启动容器
- docker run -d --restart=always --name rmqadmin
- -e "JAVA_OPTS=-
- Drocketmq.namesrv.addr=192.168.66.100:9876 -
- Dcom.rocketmq.sendMessageWithVIPChannel=false"
- -p 8080:8080 pangliang/rocketmq-console-ng
关闭防火墙(或者开放端口)
- #关闭防火墙
- systemctl stop firewalld.service
- #禁止开机启动
- systemctl disable firewalld.service
测试
访问:http://192.168.66.101:8080/#/ (可以切换中文)

创建父工程rocketmq-msg

创建订单微服务子工程

引入依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starterwebartifactId>
- dependency>
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connectorjavaartifactId>
- dependency>
- <dependency>
- <groupId>org.apache.rocketmqgroupId>
- <artifactId>rocketmq-spring-bootstarterartifactId>
- <version>2.0.2version>
- dependency>
- <dependency>
- <groupId>com.baomidougroupId>
- <artifactId>mybatis-plus-bootstarterartifactId>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- dependency>
- dependencies>
编写配置文件
- server:
- port: 9090
- spring:
- application:
- name: tx-msg-stock
- datasource:
- url: jdbc:mysql://192.168.66.100:3306/txmsg-order?
- useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
- t=true&failOverReadOnly=false&useSSL=false
- username: root
- password: 123456
- driver-class-name: com.mysql.cj.jdbc.Driver
- ################ RocketMQ 配置 ##########
- rocketmq:
- name-server: 192.168.66.100:9876
- producer:
- group: order-group
编写主启动类
- /**
- * 订单微服务启动成功
- */
- @Slf4j
- @MapperScan("com.tong.order.mapper")
- @SpringBootApplication
- public class OrderMain9090 {
- public static void main(String[] args) {
- SpringApplication.run(OrderMain9090.class,args);
- log.info("************* 订单微服务启动成功*******");
- }
- }
代码生成
- package com.tong.utils;
- import com.baomidou.mybatisplus.generator.FastAutoGenerator;
- import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
- import java.util.Arrays;
- import java.util.List;
- public class CodeGenerator {
- public static void main(String[] args) {
- FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456")
- .globalConfig(builder -> {
- builder.author("tong")// 设置作者
- .commentDate("MMdd") // 注释日期格式
- .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/")
- .fileOverride(); //覆盖文件
- })
- // 包配置
- .packageConfig(builder -> {
- builder.parent("com.tong.orders") // 包名前缀
- .entity("entity")//实体类包名
- .mapper("mapper")//mapper接口包名
- .service("service"); //service包名
- })
- .strategyConfig(builder -> {
- // 设置需要生成的表名
- builder.addInclude(Arrays.asList("orders","tx_log"))
- // 开始实体类配置
- .entityBuilder()
- // 开启lombok模型
- .enableLombok()
- //表名下划线转驼峰
- .naming(NamingStrategy.underline_to_camel)
- //列名下划线转驼峰
- .columnNaming(NamingStrategy.underline_to_camel);
- })
- .execute();
- }
- }
创建TxMessage类
在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class TxMessage implements Serializable
- {
- private static final long serialVersionUID = -4704980150056885074L;
- /**
- * 商品id
- */
- private Long productId;
- /**
- * 商品购买数量
- */
- private Integer payCount;
- /**
- * 全局事务编号
- */
- private String txNo;
- }

业务逻辑层主要实现了用户提交订单后的业务逻辑。
编写OrderService接口
- /**
- * 添加订单
- * @param productId 商品id
- * @param payCount 购买数量
- */
- void save(Long productId,Integer payCount);
- /**
- * 提交订单同时保存事务信息
- */
- void submitOrderAndSaveTxNo(TxMessage txMessage);
- /**
- * 提交订单
- * @param productId 商品id
- * @param payCount 购买数量
- */
- void submitOrder(Long productId, Integer payCount);
编写OrderService接口实现
- package com.itbaizhan.order.service.impl;
- import com.alibaba.fastjson.JSONObject;
- import com.tong.order.entity.Order;
- import com.tong.order.entity.TxLog;
- import com.tong.order.mapper.OrderMapper;
- import com.tong.order.mapper.TxLogMapper;
- import com.tong.order.service.IOrderService;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.tong.order.tx.TxMessage;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- import java.time.LocalDateTime;
- import java.util.Date;
- import java.util.UUID;
- /**
- *
- * 服务实现类
- *
- *
- * @author tong
- * @since 05-20
- */
- @Slf4j
- @Service
- public class OrderServiceImpl extends
- ServiceImpl
implements IOrderService { - @Resource
- RocketMQTemplate rocketMQTemplate;
- @Resource
- private TxLogMapper txLogMapper;
- /**
- * 添加
- * @param productId 商品id
- * @param payCount 购买数量
- */
- @Override
- public void save(Long productId, Integer payCount) {
- Order order = new Order();
- // 订单创建时间
- order.setCreateTime(LocalDateTime.now());
- // 生产订单编号
- order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
- // 商品id
- order.setProductId(productId);
- // 购买数量
- order.setPayCount(payCount);
- baseMapper.insert(order);
- }
- @Override
- @Transactional(rollbackFor = Exception.class)
- public void submitOrderAndSaveTxNo(TxMessage txMessage) {
- TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
- if(txLog != null){
- log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo());
- return;
- }
- //生成订单
- this.save(txMessage.getProductId(),txMessage.getPayCount());
- //生成订单
- txLog = new TxLog();
- txLog.setTxNo(txMessage.getTxNo());
- txLog.setCreateTime(LocalDateTime.now());
- //添加事务日志
- txLogMapper.insert(txLog);
- }
- /**
- * 提交订单
- * @param productId 商品id
- * @param payCount 购买数量
- */
- @Override
- public void submitOrder(Long productId,Integer payCount) {
- //生成全局分布式序列号
- String txNo = UUID.randomUUID().toString();
- TxMessage txMessage = new TxMessage(productId, payCount, txNo);
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("txMessage", txMessage);
- Message
message = MessageBuilder.withPayload(jsonObject.toJSONString()).build(); - //发送事务消息 且该消息不允许消费
- tx_order_group: 指定版事务消息组
- rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
- }
- }
执行本地的业务代码
- package com.tong.order.message;
- import com.alibaba.fastjson.JSONObject;
- import com.tong.order.entity.TxLog;
- import com.tong.order.mapper.TxLogMapper;
- import com.tong.order.service.IOrderService;
- import com.tong.order.service.ITxLogService;
- import com.tong.order.tx.TxMessage;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
- import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
- import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.Message;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- /**
- * @author tong
- * @version 1.0.0
- * @description 监听事务消息
- */
- @Slf4j
- @Component
- @RocketMQTransactionListener(txProducerGroup = "tx_order_group")
- public class OrderTxMessageListener implements
- RocketMQLocalTransactionListener {
- @Autowired
- private IOrderService orderService;
- @Resource
- private TxLogMapper txLogMapper;
- /**
- * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
- * 成功返回: RocketMQLocalTransactionState.COMMIT,
- * 失败返回:RocketMQLocalTransactionState.ROLLBACK
- */
- @Override
- @Transactional(rollbackFor = Exception.class)
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj)
- {
- try {
- log.info("订单微服务执行本地事务");
- TxMessage txMessage = this.getTxMessage(msg);
- //执行本地事务
- orderService.submitOrderAndSaveTxNo(txMessage);
- //提交事务
- log.info("订单微服务提交事务");
- // COMMIT:即生产者通知Rocket该消息可以消费
- return RocketMQLocalTransactionState.COMMIT;
- } catch (Exception e) {
- e.printStackTrace();
- //异常回滚事务
- log.info("订单微服务回滚事务");
- // ROLLBACK:即生产者通知Rocket将该消息删除
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }
- private TxMessage getTxMessage(Message msg)
- {
- String messageString = new String((byte[]) msg.getPayload());
- JSONObject jsonObject = JSONObject.parseObject(messageString);
- String txStr = jsonObject.getString("txMessage");
- return JSONObject.parseObject(txStr,TxMessage.class);
- }
- }
网络异常消息处理
- /**
- * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地
- * 事务是否已经执行成功,
- * 成功返回: RocketMQLocalTransactionState.COMMIT,
- * 失败返回:RocketMQLocalTransactionState.ROLLBACK
- */
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- log.info("订单微服务查询本地事务");
- TxMessage txMessage = this.getTxMessage(msg);
- // 获取订单的消息
- Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
- if (exists != null) {
- // COMMIT:即生产者通知Rocket该消息可以消费
- return RocketMQLocalTransactionState.COMMIT;
- }
- // UNKNOWN:即生产者通知Rocket继续查询该消息的状态
- return RocketMQLocalTransactionState.UNKNOWN;
- }
- private TxMessage getTxMessage(Message msg)
- {
- String messageString = new String((byte[]) msg.getPayload());
- JSONObject jsonObject = JSONObject.parseObject(messageString);
- String txStr = jsonObject.getString("txMessage");
- return JSONObject.parseObject(txStr,TxMessage.class);
- }
创建库存微服务tx-msg-stock

引入依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starterwebartifactId>
- dependency>
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connectorjavaartifactId>
- dependency>
- <dependency>
- <groupId>org.apache.rocketmqgroupId>
- <artifactId>rocketmq-spring-bootstarterartifactId>
- <version>2.0.1version>
- dependency>
- <dependency>
- <groupId>com.baomidougroupId>
- <artifactId>mybatis-plus-bootstarterartifactId>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- dependency>
- dependencies>
编写配置文件
- server:
- port: 6060
- spring:
- application:
- name: tx-msg-stock
- datasource:
- url: jdbc:mysql://192.168.66.100:3306/txmsg-stock?
- useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
- t=true&failOverReadOnly=false&useSSL=false
- username: root
- password01: 123456
- driver-class-name: com.mysql.cj.jdbc.Driver
- ################ RocketMQ 配置 ##########
- rocketmq:
- name-server: 192.168.66.100:9876
编写主启动类
- package com.tong.stock;
- import lombok.extern.slf4j.Slf4j;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- /**
- * @author tong
- * @version 1.0.0
- * @description 库存微服务启动类
- */
- @MapperScan("com.tong.stock.mapper")
- @Slf4j
- @SpringBootApplication
- public class StockServerStarter {
- public static void main(String[] args) {
- SpringApplication.run(StockServerStarter.class, args);
- log.info("**************** 库存服务启动成功 ***********");
- }
- }
代码生成
- package com.tong.utils;
- import com.baomidou.mybatisplus.generator.FastAutoGenerator;
- import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
- import java.util.Arrays;
- import java.util.List;
- public class CodeGenerator {
- public static void main(String[] args) {
- FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456")
- .globalConfig(builder -> { builder.author("tong")// 设置作者
- .commentDate("MMdd") // 注释日期格式
- .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/")
- .fileOverride(); //覆盖文件
- })
- // 包配置
- .packageConfig(builder -> {
- builder.parent("com.tong.stock") // 包名前缀
- .entity("entity")//实体类包名
- .mapper("mapper")//mapper接口包名
- .service("service"); //service包名
- })
- .strategyConfig(builder -> {
- // 设置需要生成的表名
- builder.addInclude(Arrays.asList("stock","tx_log"))
- // 开始实体类配置
- .entityBuilder()
- // 开启lombok模型
- .enableLombok() //表名下划线转驼峰
- .naming(NamingStrategy.underline_to_camel)
- //列名下划线转驼峰
- .columnNaming(NamingStrategy.underline_to_camel);
- })
- .execute();
- }
- }
编写库存接口
- public interface StockService {
- /**
- * 根据id查询库存
- * @param id
- * @return
- */
- Stock getStockById(Long id);
- /**
- * 扣减库存
- */
- void decreaseStock(TxMessage txMessage);
- }