该系列参考:
https://www.jianshu.com/p/962271bbf4ea
https://blog.csdn.net/a745233700/article/details/122402303
https://cloud.tencent.com/developer/article/2048776
https://icyfenix.cn/
本地消息事务、RocketMq、最大努力通知都是基于消息队列来实现的解决方案,所以我就放一起来介绍了
将分布式事务拆分成本地事务进行处理,通过在分布式事务主动发起方额外新建事务消息表,事务发起方把业务处理和记录事务消息在本地事务中完成,轮询事务消息表发送事务消息,事务被动方基于消息中间件消费事务消息表中的事务消息

过程分析:
注意事项:
优点: 简单实现,没有过度依赖中间件
缺点:
特点: 柔性事务,数据最终一致性,最大努力交付思想
基于MQ的分布式事务方案本质上是对本地消息表的封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库中

过程分析:
优点: 业务系统和消息系统解耦,无需在新建消息表以及额外的定时处理,性能比消息表高
缺点: 额外的网络请求开销,要有额外的事务检查接口
注意事项: 要保证幂等性
使用TransactionMQProducer类创建生产者客户端,并指定唯一的producerGroup,可以设置自定义线程池来处理检查请求。本地事务执行后,需要根据执行结果回复MQ
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三个事务状态之一。
“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三个事务状态之一
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//这里做事务处理,根据结果返回消息状态 见RokcetMq事务消息状态
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//这里是broker回调查询事务状态,根据事务状态返回对应的消息状态 见RokcetMq事务消息状态
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
最大努力通知也称为定期校对。它在事务主动方增加了消息校对的接口,如果事务被动方没有接收到主动方发送的消息,此时可以调用事务主动方提供的消息校对的接口主动获取

在可靠消息事务中,事务主动方需要将消息发送出去,并且让接收方成功接收消息,这种可靠性发送是由事务主动方保证的;但是最大努力通知,事务主动方仅仅是尽最大努力(重试,轮询…)将事务发送给事务接收方,所以存在事务被动方接收不到消息的情况,此时需要事务被动方主动调用事务主动方的消息校对接口查询业务消息并消费,这种通知的可靠性是由事务被动方保证的。
特点: 由被动方主动调用接口来校对业务情况是最核心的思想