目录
1.3 消费者消息确认(解决消费者接收到消息后未消费就宕机)
几种消息丢失的情况
1. 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
2. MQ宕机,MQ基于内存,宕机后queue消息丢失
3. consumer接收到消息后未消费就宕机
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
结果有两种请求:
发送者确认消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
发送者回执消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
调用 ReturnCallback ,做好后序方案,重发消息或者记录消息
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的ID,避免ACK冲突
【配置】在publisher这个微服务的application.yml中添加配置:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated
- correlatedpublisher-returns: true
- template:
- mandatory : true
publish-confirm-type:开启publisher-confirm
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义Confirmcallback,MQ返回结果时会回调这个Confirmcallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义Returncallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback; false:则直接丢弃消息
MQ默认是内存存储消息,开启持久化功能可以保证缓存在MQ中的消息不丢失,在SpringAMQP中,交换机,队列,消息都是默认持久化的。
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测l代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐使用)
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(不推荐使用)
【配置】在消费者这个微服务的application.yml中添加配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1
- acknowledge-mode: none
-
- # none,关闭ack ; manual,手动ack ; auto:自动ack
【缺点】消费者失败重试的弊端
当消费者出现异常后,为了防止消息丢失,默认消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,MQ负载加大。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
【配置】在消费者这个微服务的application.yml中添加配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1
- retry:
- enabled: true #开启消费者失败重试
- initial-interval:1000#初识的失败等待时长为1秒
- multiplier: 1 #下次失败的等待时长倍数
- max-attempts: 3 #最大本地重试次数
- stateless: true # true无状态; false有状态。如果包含事务,改为false
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式.
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)
RepublishMessageRecoverer实现图示
当队列中的消息满足
三个条件之一,就可以成为死信(dead letter)既未被消费的信息。
如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信会投递到这个交换机,这个交换机就是死信交换机
注意:是队列中的消息(消费者失败消息处理策略不同)
如何给队列绑定死信交换机?
- 给队列设置dead-letter-exchange属性,指定一个交换机
- 给队列设置dead-letter-routing-key属性,设置死信交换机和死信队列的routing key
TTL(Time to Live),如果一个队列中消息TTL时间结束后还没有消费,则成为死信
TTL超时分为两种情况:
- 消息所在队列设置了存活时间
- 消息设置了存活时间
当都设置了存活时间,则以最短的为标准
【使用】可以利用TTL完成延迟队列(一段时间后消费者收到消息)
- 给消息的目标队列指定一个死信交换机
- 设置一个消费者监听死信交换机绑定的死信队列
- 给消息设置一个TTL的存活时间
图示如下
延迟队列(Delay Queue)RabbitMQ的一个插件,插件名字为 DelayExchange ,可以在RabbitMQ官网下载完成消息发出后,消费者延迟收到消息的效果
延迟队列的使用场景
【使用】MQ注解的使用方式
- RabbitListener(bindings = cQueueBinding(
- value = cQueue(name = "delay.queue",durable = "true"),
- exchange = @Exchange(name = "delay.direct", delayed = "true" //设置属性为delayed
- key = "delay"
- ))
- public void listenDelayedQueue ( String msg){
- log.info("接收到delay.queue的延迟消息:{",msg);
- }
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
【思路】解决消息堆积有三种种思路
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后 直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
【使用】MQ注解的使用方式
- @RabbitListener (queuesToDeclare = @Queue(
- name = "lazy.queue",
- durable = "true",
- arguments = @Argument (name = "x-queue . -model", value = "lazy,")
- ))
- public void listenLazyQueue(String msg){
- log. info("接收到lazy.queue的消息: {}", msg);
- }
惰性队列的优点有哪些?
惰性队列的缺点有哪些?
幂等:是一个数学概念,表示N次变换和1次变换的结果相同。在计算机中编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
A. 设计全局唯一消息id
B.数据库唯一主键
引入Redis解决重复消费问题(完成幂等)
1、利用Redis,首先系统生成全局唯一的 id,用set操作放入Redis中
2、如订单信息id,消费后存储在Redis中,如果下次再来,先查看Redis中是否存在
3、如果存在,即此消息已经被消费过(后续不做消费处理)
4、如果不存在,即未消费,此时再将此id存入Redis中,进行后续的逻辑操作