• RabbitMQ之幂等性问题处理


    目录

    基本介绍

    RabbitMQ幂等性问题

    如何避免消息的重复消费问题?


    基本介绍

    消息消费时的幂等性(消息不被重复消费),同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

    幂等性:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

    接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的,比如:注册接口;发送短信验证码接口;还有比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的。

    幂等性是分布式系统设计中的一个重要概念,是在做系统或者接口设计时要着重考虑的问题,尤其像支付宝、银行、互联网金融等涉及钱的系统,既要高效,数据也要准确,绝对不能出现多扣款,多打款等问题,幂等性的设计就显得更为重要了。

    RabbitMQ幂等性问题

    在RabbitMQ也回出现这个问题,RabbitMQ把消息发送给消费者消费,消费者消费成功并返回ack消息,但这时候网络中断了,RabbitMQ没有收到ack消息,这就让RabbitMQ误以为消息消费失败了,然后RabbitMQ会重新把该条消息发送给其他的消费者,或者等网络重连后再次发送给该候消费者,这时候就会造成重复消费的问题。

    • 生产者重复向RabbitMQ代理的消息队列发送消息

    消息发送到代理并持久化后,由于网络断开或者客户端崩溃,代理未能回复客户端,导致生产者认为代理没有收到消息而重新发送,结果消费者收到两条具有相同内容和消息ID的消息

    • RabbitMQ代理向消费者重复传递消息

    消息发送给消费者后,由于网络断开等原因,消费者客户端没有向broker返回ACK响应,代理不知道消息是否被消费,为了确保消息至少被消费一次,代理在网络恢复后再次传递消息,结果消费者就收到了两条具有相同内容和消息ID的消息。

    如何避免消息的重复消费问题?

    全局唯一ID + Redis

    生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

    1. @RabbitListener(queues = "durable-queue")
    2. @RabbitHandler
    3. public void processIde(Message message, Channel channel) throws IOException {
    4. if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
    5. // 业务操作...
    6. System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
    7. // 手动确认
    8. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    9. }
    10. }

     Message

    在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

    1. MessageProperties // 消息属性

    2. byte[] body // 消息内容

    @RabbitListener

    使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

    • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

    • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

      • application/octet-stream:二进制字节数组存储,使用 byte[]
      • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
      • text/plain:文本数据类型存储,使用 String
      • application/json:JSON 格式,使用 Object、相应类型

  • 相关阅读:
    Kubernetes实战(二)-使用Kor过滤Kubernetes未使用资源
    昨天阅读量646
    JS-项目实战-新增水果库存功能实现
    YOLOv8改进Swin Transformer:在基础SwinTransformer结构的基础上进行多种改进结构,集成Transformer和CNN的优势
    mac 下载使用sshpass+命令起别名
    2019年1+X 证书 Web 前端开发中级理论考试——易错题、陌生但又会考到的题目原题+答案
    2342. 数位和相等数对的最大和 : 用「遍历过程」代替「次大维护」
    理解分布式Session处理来看看spring怎么做的
    谦卑篇(Be humble)//伟大是用卑微来换取的,任何时候都应该看清自己
    Apache Hudi Timeline:支持 ACID 事务的基础
  • 原文地址:https://blog.csdn.net/m0_62436868/article/details/133471255