• 【RabbitMQ】消息队列需要解决的几个问题


    目录

    一:如何保证消息可靠性

    1.1 生产者消息确认(解决发送时消息丢失)

    1.2 消息持久化(解决MQ宕机)

    1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)

    失败重试机制

    消费者失败消息处理策略

    二:如何完成消息的延迟接收

    2.1 什么是死信?

    2.2 死信交换机

    2.3 TTL(存活时间)

    2.4 延迟队列

    三:如何防止消息堆积

     3.1 消息堆积问题

    3.2 惰性队列

    四:如何防止消息重复发送

    4.1 幂等

    4.2 消息重复的原因

     4.3 解决方案


    一:如何保证消息可靠性

    几种消息丢失的情况

    1. 发送时丢失

    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue

    2. MQ宕机,MQ基于内存,宕机后queue消息丢失

    3. consumer接收到消息后未消费就宕机

    1.1 生产者消息确认(解决发送时消息丢失)

    生产者确认机制
    RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

    结果有两种请求:

    • publisher-confirm,

                    发送者确认消息成功投递到交换机,返回ack

                    消息未投递到交换机,返回nack

    • publisher-return,

                    发送者回执消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

                    调用 ReturnCallback ,做好后序方案,重发消息或者记录消息


     

     注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的ID,避免ACK冲突


    【配置】在publisher这个微服务的application.yml中添加配置:

    1. spring:
    2.         rabbitmq:
    3.                 publisher-confirm-type: correlated
    4.                 correlatedpublisher-returns: true
    5.                 template:
    6.                         mandatory : true

    publish-confirm-type:开启publisher-confirm
        simple:同步等待confirm结果,直到超时
        correlated:异步回调,定义Confirmcallback,MQ返回结果时会回调这个Confirmcallback
    publish-returns:开启publish-return功能,

            同样是基于callback机制,不过是定义Returncallback
    template.mandatory:定义消息路由失败时的策略。

            true,则调用ReturnCallback; false:则直接丢弃消息


    1.2 消息持久化(解决MQ宕机)

    MQ默认是内存存储消息,开启持久化功能可以保证缓存在MQ中的消息不丢失,在SpringAMQP中,交换机,队列,消息都是默认持久化的。


    1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)

    RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

    • manual:手动ack,需要在业务代码结束后,调用api发送ack。
    • auto:自动ack,由spring监测l代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐使用)
    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(不推荐使用)

    【配置】在消费者这个微服务的application.yml中添加配置:

    1. spring:
    2.         rabbitmq:
    3.                 listener:
    4.                         simple:
    5.                                 prefetch: 1
    6.                                 acknowledge-mode: none
    7. # none,关闭ack ; manual,手动ack ; auto:自动ack

    失败重试机制

    【缺点】消费者失败重试的弊端
    当消费者出现异常后,为了防止消息丢失,默认消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,MQ负载加大。
     

    我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

    【配置】在消费者这个微服务的application.yml中添加配置:

    1. spring:
    2.         rabbitmq:
    3.                 listener:
    4.                         simple:
    5.                                 prefetch: 1
    6.                                 retry:
    7.                                         enabled: true #开启消费者失败重试
    8.                                         initial-interval:1000#初识的失败等待时长为1
    9.                                         multiplier: 1 #下次失败的等待时长倍数 
    10.                                         max-attempts: 3 #最大本地重试次数
    11.                                         stateless: true # true无状态; false有状态。如果包含事务,改为false

    消费者失败消息处理策略

    在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

    1. RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式.
    2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)

    RepublishMessageRecoverer实现图示


    二:如何完成消息的延迟接收

    2.1 什么是死信?

    当队列中的消息满足

    1.         消费者消费失败
    2.         消息是一个过期消息,超时无人消费
    3.         投递的队列中消息满了,最早的消息被丢弃

    三个条件之一,就可以成为死信(dead letter)既未被消费的信息。

    2.2 死信交换机

    如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信会投递到这个交换机,这个交换机就是死信交换机

     注意:是队列中的消息(消费者失败消息处理策略不同)

    如何给队列绑定死信交换机?

    1. 给队列设置dead-letter-exchange属性,指定一个交换机
    2. 给队列设置dead-letter-routing-key属性,设置死信交换机和死信队列的routing key

    2.3 TTL(存活时间)

    TTL(Time to Live),如果一个队列中消息TTL时间结束后还没有消费,则成为死信

    TTL超时分为两种情况:

    1. 消息所在队列设置了存活时间
    2. 消息设置了存活时间

    当都设置了存活时间,则以最短的为标准

    【使用】可以利用TTL完成延迟队列(一段时间后消费者收到消息)

    1. 给消息的目标队列指定一个死信交换机
    2. 设置一个消费者监听死信交换机绑定的死信队列
    3. 给消息设置一个TTL的存活时间

    图示如下

    2.4 延迟队列

    延迟队列(Delay Queue)RabbitMQ的一个插件,插件名字为 DelayExchange ,可以在RabbitMQ官网下载完成消息发出后,消费者延迟收到消息的效果

    延迟队列的使用场景

    • 延迟发送短信
    • 用户下单,15分钟未支付取消
    • 20分钟后通知消息等

    【使用】MQ注解的使用方式

    1. RabbitListener(bindings = cQueueBinding(
    2. value = cQueue(name = "delay.queue",durable = "true"),
    3. exchange = @Exchange(name = "delay.direct", delayed = "true" //设置属性为delayed
    4. key = "delay"
    5. ))
    6. public void listenDelayedQueue ( String msg){
    7. log.info("接收到delay.queue的延迟消息:{",msg);
    8. }

    三:如何防止消息堆积

     3.1 消息堆积问题

    生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

    【思路】解决消息堆积有三种种思路

    • 增加更多消费者,提高消费速度
    • 在消费者内开启线程池加快消息处理速度
    • 扩大队列容积,提高堆积上限

    3.2 惰性队列

    从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
    惰性队列的特征如下:

    • 接收到消息后 直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储

    设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

    【使用】MQ注解的使用方式

    1. @RabbitListener (queuesToDeclare = @Queue(
    2. name = "lazy.queue",
    3. durable = "true",
    4. arguments = @Argument (name = "x-queue . -model", value = "lazy,")
    5. ))
    6. public void listenLazyQueue(String msg){
    7. log. info("接收到lazy.queue的消息: {}", msg);
    8. }

    惰性队列的优点有哪些?

    • 基于磁盘存储,消息上限高
    • 没有间歇性的page-out, 性能比较稳定

    惰性队列的缺点有哪些?

    • 基于磁盘存储,消息时效性会降低
    • 性能受限于 磁盘的I0

    四:如何防止消息重复发送

    4.1 幂等

    幂等:是一个数学概念,表示N次变换和1次变换的结果相同。在计算机中编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同

    4.2 消息重复的原因

    • 网络波动, 可能会引起重复请求
    • 页面重复刷新
    • 浏览器重复的HTTP请求
    • 定时任务重复执行
    • 用户双击提交按钮
    • 数据的重复推送等等

     4.3 解决方案

    A. 设计全局唯一消息id

    1. 推送过来的数据我们在业务逻辑层处理 主键id设计为 案件id+嫌疑人id,插入到数据库
    2. 后续推送的数据,我们先根据主键id 查询数据库中是否存在,如果存在则过滤掉此数据,如果不存在则插入数据库

    B.数据库唯一主键

    1. 数据库DB层面,主键也是不能冲突的,重复的数据是无法插入的

    引入Redis解决重复消费问题(完成幂等)

    1、利用Redis,首先系统生成全局唯一的 id,用set操作放入Redis中
    2、如订单信息id,消费后存储在Redis中,如果下次再来,先查看Redis中是否存在
    3、如果存在,即此消息已经被消费过(后续不做消费处理)
    4、如果不存在,即未消费,此时再将此id存入Redis中,进行后续的逻辑操作


     

     

  • 相关阅读:
    [ansible]playbook结合项目解释执行步骤
    springboot run方法执行流程分析
    conda常用命令
    基于内存的分布式NoSQL数据库Redis(三)常用命令
    【C语言实现内核链表】
    10分钟SkyWalking与SpringBoot融合并整合到Linux中
    linux驱动调试之printk
    【0230】PG内核底层事务(transaction)实现原理之基础篇
    UE4 粒子特效基础学习 (03-制作上升光线特效)
    K8s中集成Heketi使用Glusterfs
  • 原文地址:https://blog.csdn.net/m0_46628950/article/details/126235831