RocketMQ有特有的事物消息机制,事务消息是其它所有消息中间件所不具备的。
RocketMQ提供事务消息,通过事务消息就能达到分布式事务的最终一致性。
使用的rocketmq-spring-boot-starter版本
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.2version>
dependency>
springboot版本2.3.2.RELEASE
springcloud版本Hoxton.SR8
springcloud-alibaba版本2.2.5.RELEASE
本章代码已分享至Gitee: https://gitee.com/lengcz/springcloudalibaba01.git
事务消息可以分为两个流程: 正常事务消息的发送及提交流程、事务消息的补偿流程,分别解析如下:
(1)正常的事务消息的发送及提交流程
(2)事务消息的补偿流程
package com.lcz.pojo;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
import java.util.Date;
//消息事务状态记录
@Entity(name="tx_log")
@Data
public class TxLog {
@Id
private String txId;
private Date date;
}
import com.lcz.dao.OrderDao;
import com.lcz.dao.TxLogDao;
import com.lcz.pojo.Order;
import com.lcz.pojo.TxLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
@Service
@Slf4j
public class OrderServiceImpl4 {
@Autowired
private OrderDao orderDao;
@Autowired
private TxLogDao txLogDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrderBefore(Order order) {
String uuid = UUID.randomUUID().toString();//设置唯一的事务id
log.info("创建订单:"+uuid+",发送半事务消息");
//发送半消息
rocketMQTemplate.sendMessageInTransaction(
"tx_topic",
MessageBuilder.withPayload(order).setHeader("txId",uuid).build(),
order);
}
@Transactional //本地事务,要么同时成功,要么同时失败
public void createOrder(String txId,Order order) {
log.info("创建订单:"+txId+",创建成功");
//保存订单
orderDao.save(order);
TxLog txLog = new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date());
//记录事务日志
this.txLogDao.save(txLog);
}
}
import com.lcz.dao.TxLogDao;
import com.lcz.pojo.Order;
import com.lcz.pojo.TxLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@RocketMQTransactionListener
@Slf4j
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
@Autowired
private OrderServiceImpl4 orderServiceImpl4;
@Autowired
private TxLogDao txLogDao;
//执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//第一个参数message对应的消息,第二个参数对应的arg
String txId = (String)message.getHeaders().get("txId");
try{
log.info("执行事务,txId:"+txId);
Order order=(Order) o;
orderServiceImpl4.createOrder(txId,order);
log.info("执行事务成功,txId:"+txId);
return RocketMQLocalTransactionState.COMMIT; //本地事务成功
}catch (Exception e){
log.info("执行事务失败,txId:"+txId);
return RocketMQLocalTransactionState.ROLLBACK; //本地事务失败
}
}
//消息回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String txId = (String)message.getHeaders().get("txId");
log.info("事务回查,txId:"+txId);
TxLog txLog= txLogDao.findById(txId).get();
if(null!=txLog){
//本地事务成功了
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Autowired
private OrderServiceImpl4 orderService;//用的这个service
@Autowired
private ProductService productService;
@GetMapping("/order/prod/{pid}")
public Order order(@PathVariable Integer pid) {
log.info("调用商品服务,调用商品微服务查询此商品");
/**
* Ribbon负载均衡
*/
Product product = productService.findById(pid);
if(product.getPid()<1){
Order order=new Order();
order.setOid(-1L);
order.setPname("下单失败");
return order;
}
log.info("查询到的商品内容:" + JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试");
order.setPid(pid);
order.setPrice(product.getPrice());
order.setPname(product.getPname());
order.setNumber(1);
orderService.createOrderBefore(order);
log.info("用户下单成功,订单信息为:" + JSON.toJSON(order));
return order;
}
上面我们的事务是成功的,我们通过debug模式,在执行事务和消息回查添加断点,发送下单请求,当请求进入执行事务时,直接杀死线程(模拟宕机),然后重启应用,将会进行消息回查。
解释:
txId 事务id,因为我们在创建订单时,使用@Transactional 本地事务,所以当创建订单成功时,事务表里也会有这条对应的txId,如果查不到这个txId,则表示事务是失败的,需要回滚,半事务消息就不会被投递。