• 【RocketMQ 二十七】RocketMQ 消费幂等


    为了防⽌消息重复消费导致业务处理异常,消息队列 RocketMQ 版的消费者在接收到消息后,有必要根据业务上的唯⼀ Key 对消息做幂等处理。

    什么是消息幂等

    如果有⼀个操作,多次执⾏与⼀次执⾏所产⽣的影响是相同的,我们就称这个操作是幂等的。
    当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费⼀次的结果是相同的,并且多次消费并未对业务系统产⽣任何负⾯影响,那么这整个过程就可实现消息幂等。

    适⽤场景

    在互联⽹应⽤中,尤其在⽹络不稳定的情况下,消息队列 RocketMQ 版的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。

    消息重复的场景如下:

    • 发送时消息重复
      在这里插入图片描述

    当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

    • 投递时消息重复
      在这里插入图片描述

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。为了保证消息⾄少被消费⼀次,消息队列 RocketMQ 版的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID也相同的消息。

    • 负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及消费者应⽤重启)

    当消息队列 RocketMQ 版的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

    实现消息幂等

    定义消息幂等的两要素:

    幂等令牌

    幂等令牌是⽣产者和消费者两者中的既定协议,在业务中通常是具备唯⼀业务标识的字符串,如:
    订单号、流⽔号等。且⼀般由⽣产者端⽣成并传递给消费者端。

    处理唯⼀性的确保

    缓存 唯⼀索引
    可以使用Redis缓存

    RocketMQ如何处理消息幂等

    RocketMQ能够保证消息不丢失但不保证消息不重复。

    如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到⾼可⽤以及⾼性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较⼤的时间和空间等资源代价,收益并不明显。
    RocketMQ考虑到正常情况下出现重复消息的概率其实是很⼩的,因此RocketMQ将消息幂等操作交给了业务⽅处理。

    因为 Message ID 有可能出现冲突(重复)的情况,因此不建议通过MessageID作为处理依据,⽽最好的⽅式是以业务唯⼀标识作为幂等处理的关键依据如:订单号、流⽔号等作为幂等处理的关键依据。⽽业务的唯⼀标识可以通过消息 Key 设置。

    以⽀付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下

    Message message = new Message();
    message.setKeys("ORDERID_100");
    SendResult sendResult = producer.send(message);
    
    • 1
    • 2
    • 3

    消费者收到消息时可以根据消息的 Key,即订单号来实现消息幂等:

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext
    context) {
    for(MessageExt msg:msgs){
    String key = msg.getKeys();
    // 根据业务唯⼀标识的 Key 做幂等处理
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    消费端常⻅的幂等操作

    1、业务操作之前进⾏状态查询

    消费端开始执⾏业务操作时,通过幂等id⾸先进⾏业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进⾏处理。那么我们只需要在消费逻辑执⾏之前通过订单号进⾏订单状态查询,⼀旦获取到确定的订单状态则对消息进⾏提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。

    2、唯⼀性约束保证最后⼀道防线

    上述第⼆点操作并不能保证⼀定不出现重复的数据,如:并发插⼊的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插⼊操作,因此我们务必要对幂等id添加唯⼀性索引,这样就能够保证在并发场景下也能保证数据的唯⼀性。

    3、引⼊锁机制

    上述的第⼀点中,如果是并发更新的情况,没有使⽤悲观锁、乐观锁、分布式锁等机制的前提下,进⾏更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
    ⾼并发下,建议通过状态机的⽅式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提⾼因此不建议使⽤。

  • 相关阅读:
    朝气蓬勃 后生可畏
    人工智能与深度神经网络,人工智能人工神经网络
    手机网页,输入时 软键盘盖住输入框完整解决方案,兼容安卓、鸿蒙、苹果IOS
    9. 原型模式
    linux安装opencv
    【222】MyQSL技术内幕 InnoDB 存储引擎第二版 笔记
    linux创建ftp账号
    浅谈HTTP缓存与CDN缓存的那点事
    slam 14讲笔记
    基于.Net Core实现的飞书所有文档一键导出服务(支持多系统)
  • 原文地址:https://blog.csdn.net/qq_33333654/article/details/127423245