• 浅谈RocketMQ的事务消息


    . 业务场景

    壹哥今天和大家分享一个关于RocketMQ的面试题——“RocketMQ事务消息”。

    在给面试官讲解这个问题之前,你可以先设计一个业务场景,越真实越好,越贴近生产越好,如果没有生产案例,可以直接列举电商中大家都容易懂的业务场景。比如,在分布式场景中用户取消订单,增加用户账户余额。这个业务简单易懂,业务大致流程是两个服务协同完成业务,订单服务取消订单,账户服务新增用户账户余额。场景有了,那接下来咱们就可以跟面试官沟通怎么解决,把思路分析清楚。

    . 解决思路

    1. 思路一

    在分布式场景中,我们可以通过远程调用协议(RPC/HTTP)来完成服务间的远程调用,如果我们使用的是Dubbo、HSF、GRPC 等技术方案,那么使用RPC协议就可以完成服务间远程调用;如果使用的是SpringCloud技术方案,那么可以通过Openfeign(底层http协议)来完成服务间远程调用。具体实现流程如下图:

     在分布式场景中,数据一致性是我们必须要面对的问题。在分布式项目中,保证数据的一致性,我们不能用本地事务的思维去解决,必须采用分布式事务的解决方案。

    在这里壹哥给大家推荐阿里的生产实践方案seata,至于seata具体是怎么实现的,解决原理是什么,我这里就不给大家详细说明了,我们的线下课程中有详细的讲解。当然如果有这方面需求的童鞋,也可以参考壹哥在B站上的免费视频:千锋教育SpringCloud框架入门到精通全套教程 java微服务架构基础 史上最简单全面的微服务开发教程_哔哩哔哩_bilibili

    2. 思路二

    其实思路一并不是在正面回答面试官的问题,只是说了业务的实现方案,我们讲解思路一的目的是在告诉面试官,我知道什么是分布式事务,以及分布式事务应该怎么去解决。而思路二则是正面去回答面试官的问题,比如上面说到的业务场景,我们还可以利用MQ的事务消息来解决。

    那为什么要使用MQ呢? 这是因为MQ有解耦以及异步处理的特性。具体流程如下:

     基于MQ实现分布式事务的大概思路是,当order服务取消订单时,我们可以不用直接去调用账户服务。而是可以通过发送消息,然后账户服务监听消息队列,账户服务收到消息时处理账户业务。这个实现思路其实很简单,但要想落地实现却没那么简单,我们必须要解决取消订单与发送消息的一致性问题。试想一下,如果取消订单失败,结果消息发送成功;或者取消订单成功,但发送消息失败,那么就会导致数据的不一致问题。接下来壹哥就给大家简单复现一下。

    . 问题复现

    我们先来看看如下代码:

    1. @Transactional
    2. public ResultVo back(String ordersn) {
    3. //取消订单
    4. int update = orderMapper.update(ordersn);
    5. //发送消息
    6. SendResult sendResult = rocketMQTemplate.syncSend("order-cancel:cancel", ordersn);
    7. //异常代码
    8. int i = 1/0;
    9. return new ResultVo(true,"success");
    10. }

    如上代码,当运行异常时,本地事务一定会回滚,但消息却发送出去了,数据的一致性就得不到保证了。那该怎么解决呢?这时我们就可以采用事务消息了,事务消息就能解决这个问题!

    接下来我们就可以跟面试官讲解事务消息的具体实现过程,注意壹哥这里用的MQ是RocketMQ,大家也可以采用其他MQ产品,只要支持事务消息就可以。

    . 实现原理

    1. 原理分析

    在展示具体实现代码之前,壹哥先把MQ实现分布式事务的原理讲解一下。

     根据上图,壹哥给大家梳理了RocketMQ实现分布式事务的流程原理,如下:

    1. 将发送消息的功能从业务中剥离出来。业务的第一步不是取消订单操作,而是发送消息,当然这里发送消息只能是”半消息“,”半消息“的特点是消费者不能消费;

    2. 当broker收到”半消息“时会应答生产者,生产者收到成功应答后再执行本地事务;

    3. 执行本地事务也就是取消订单;

    4. 本地事务如果处理成功则提交消息。提交消息后,消息就可以入队,消费者就可以消费消息。当然如果本地事务失败,那么就回滚消息,消息就会被删除,消费者也收不到消息;

    5. 如果不确定本地事务是否成功,我们可以进行本地事务回查;

    6. 回查本地事务;

    7. 回查可以直接查询数据库,检查本地事务是否成功,如果成功提交消息,否则回滚消息。

    以上的实现原理很巧妙,思路也很简单,具体实现代码大家可以参考如下。如果你不理解代码的意思,可以观看壹哥的免费视频:

    RocketMQ_哔哩哔哩_bilibili

    2. 实现代码

    我们把上图中展现的功能,在如下代码中进行实现:

    1. @Transactional(rollbackFor = Exception.class)
    2. public void sendTransactionMQ() throws Exception {
    3. //消息
    4. Message message = new Message();
    5. message.setTopic("tuihuo");
    6. message.setKeys(orderId);
    7. message.setBody(orderId.getBytes());
    8. //生产者
    9. TransactionMQProducer producer = new TransactionMQProducer("TransactionMQProducer");
    10. producer.setNamesrvAddr("192.168.73.130:9876");
    11. //注册事务消息监听器
    12. producer.setTransactionListener(new TransactionListenerImpl);
    13. //发送消息
    14. TransactionSendResult sendResult = producer.sendMessageInTransaction(message,orderId);
    15. }

    3. 事务消息监听器

    另外RocketMQ的事务消息是基于两阶段提交方案来实现的,也就是说消息会有两个状态,prepared和commited。当消息执行完send方法后,进入到prepared状态,然后就会执行executeLocalTransaction方法,该方法的返回值有3个,这些返回值决定着该消息的命运。

    • COMMIT_MESSAGE:提交消息。表示该消息由prepared状态进入到commited状态,消费者可以消费这个消息;

    • ROLLBACK_MESSAGE:回滚。表示该消息将被删除,消费者不能消费这个消息;

    • UNKNOW:未知。这个状态有点意思,如果返回这个状态,则表示该消息既不提交,也不回滚,还是保持prepared状态。而最终决定这个消息命运的,是checkLocalTransaction这个方法。

    事务监听器的实现代码如下:

    1. public class TransactionListenerImpl implements TransactionListener {
    2. @Autowired
    3. private TermMapper termMapper;
    4. //When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
    5. @Override
    6. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    7. //本地事务
    8. Result res = orderService.cancelOrder(msg.getBody())
    9. try {
    10. if (res.isSuccess()) {
    11. // 本地事务执行成功,提交半消息
    12. System.out.println("本地事务执行成功,提交事务事务消息");
    13. return LocalTransactionState.COMMIT_MESSAGE;
    14. } else {
    15. // 本地事务执行成功,回滚半消息
    16. System.out.println("本地事务执行失败,回滚事务消息");
    17. return LocalTransactionState.ROLLBACK_MESSAGE;
    18. }
    19. } catch (Exception e) {
    20. // 异常情况返回未知状态
    21. return LocalTransactionState.UNKNOW;
    22. }
    23. }
    24. //When no response to prepare(half) message. broker will send check message to check the transaction status, and this method will be invoked to get local transaction status
    25. @Override
    26. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    27. String orderId = msg.getKeys();
    28. //检查订单删了没有,如果已经删除,那么消息可以被消费者消费
    29. Order order =orderMapper.selectById(orderId);
    30. if (order == null) {
    31. return LocalTransactionState.COMMIT_MESSAGE;
    32. }
    33. //如果还没删除,数据库操作一定回滚了,那么消息也要回滚,不让消费
    34. return LocalTransactionState.ROLLBACK_MESSAGE;
    35. }
    36. }

    . 注意事项

    我们在使用RocketMQ的时候,要注意以下两个配置参数。

    • 事务消息最大反查次数:transactionCheckMax=N

    • 事务消息检查间隔时间:默认为 60s transactionCheckInterval=5000

    好啦,关于RocketMQ的事务消息,今天就给大家分享到这里啦。

  • 相关阅读:
    一些常见的测度
    Java实现微信支付功能
    EF Core: 使用AsNoTracking减少内存调用 / 实体跟踪的技巧
    彻底解决 IDC Incast
    力扣每日一题47:全排列 ||
    systemui状态栏添加新图标
    无痛卸载流氓杀毒软件Avast
    Powershell命令行设置代理
    for...of与for...in
    技术分享 | 接口自动化测试如何搞定 json 响应断言?
  • 原文地址:https://blog.csdn.net/syc000666/article/details/125618724