目录
在上一篇文章中,我们介绍了如何确保生产者的可靠性,确保消息一定可以到达MQ。
但是MQ自己也是会丢失消息的,比如MQ的突然宕机或者消息过多造成的阻塞,因此我们这篇文章来介绍一下如何确保MQ的可靠性和消费者可靠性
默认情况下,RabbitMQ会把接收到的消息保存在内存中来降低消息收发的延迟,但这样会导致两个问题:
想要保证MQ的可靠性,主要依赖于两种方式:
RabbitMQ实现持久化主要有三个部分:
持久化队列(Durable Queues): 持久化队列指的是队列本身被存储在磁盘上,这样即使RabbitMQ服务器重启,队列也不会丢失。要创建持久化队列,需要在声明队列时设置durable
属性为true
。持久化队列中的消息默认不是持久化的,需要单独设置每条消息为持久化。
持久化消息(Persistent Messages): 持久化消息意味着消息本身被存储在磁盘上,因此即使RabbitMQ服务器重启,消息也不会丢失。在发送消息时,需要设置消息的deliveryMode
属性为2
(即PERSISTENT
),这样RabbitMQ就会将消息存储到磁盘上。
持久化交换机(Durable Exchanges): 持久化交换器与持久化队列类似,指的是交换器被存储在磁盘上,这样服务器重启后交换器依然存在。声明交换器时,也需要设置durable
属性为true
。持久化交换器确保了消息路由的结构在服务器重启后能够保留。
当我们把消息持久化到磁盘中的时候,就避免了消息挤压造成的阻塞。但是当我们把消息持久化到磁盘中的时候,这个时候是不能接收新的消息的。
因此实际上把消息持久化到磁盘中不是一个很好的解决方案,在RabbitMQ的3.6.0版本后,增加了Lazy Queue这个概念。其实就是懒惰队列:
懒惰队列的特性如下:
在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息的结束之后,将会向RabbitMQ发送一个回执,告知自己消息的处理状态 ,消费者一共可以向Rabbitmq回执三种处理状态,分别是:
而消费者确认机制并不用我们手写,Spring AMQP已经为我们实现了消费者消息确认机制,并允许我们通过配置文件的形式选择ACK处理方式,有三种:
当业务异常的时候,根据异常不同返回不同的结果:
Spring AMQP的消费者确认机制默认开启auto。
如果我们把消费者确认机制配置为auto的时候,当消费者出现异常之后,消息会不断的重新入队到队列,再重新发送给消费者。同样的消息再次引发异常,再次入队,再次发送给消费者。
这样会导致MQ的消息处理数量飙升,带来不必要的压力,因此除了把处理失败的消息重新进行处理之外,我们还应该有其他的处理方案。
我们可以利用Spring 的retry机制,在消费者出现异常的时候使用本地重试,而不是无限制的requeue到MQ队列中。
- listener:
- simple:
- prefetch: 1
- retry:
- enabled: true #开启超时重试机制
- initial-interval: 1000ms #失败后的初始等待时间
- multiplier: 1 #失败后下次等待时长倍数,下次等待时长= Initial - interval * multiplier
- max-attempts: 3 #最大重试次数
- stateless: true #true无状态,false有状态,如果业务中包含事务,就改为fasle
当我们开启重试模式之后,如果重试次数耗尽并且消息依然失败,那么就需要有MessageRecoverer接口来处理,这个接口包含三种不同的实现:
如果消息丢失不会对业务产生重大影响,可以选择RejectAndDontRequeueRecoverer
。
如果希望消息有机会被重新处理,可以选择ImmediateRequeueMessageRecoverer
。
而RepublishMessageRecoverer
则适用于需要对失败消息进行进一步分析或记录的情况。
通过上述的各种保证措施,我们基本上可以确保业务至少被执行一次。那对于一些需要保证幂等性的业务,我们要如何保证这个消息只被消费一次呢?
给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。
其实他和我们上面黏贴的文章中介绍的token机制的思想很像。而在MQ中也不需要我们手动去设置唯一ID,可以在消费者的消息转换器中开启:
我们可以看一看这个setCreateMessageIds这个方法中是如何构造id的:
这里逻辑就很清晰了,实际上就是在发消息的时候做一个if判断,如果我们设置了构造id,并且当前消息配置类的id为空,我们就为这个消息构造一个id。
但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。
在本文中,我们探讨了RabbitMQ作为领先的开源消息代理,如何通过一系列高级特性和策略来确保消息队列和消费者的高度可靠性。
首先,我们讨论了RabbitMQ的持久化机制,它允许消息和队列在服务器重启后依然保持不变。通过将消息标记为持久化,并在队列上启用持久化选项,我们可以确保关键数据不会因系统故障而丢失。
最后,我们介绍了消息确认机制,这是确保消息被成功处理的关键。消费者在处理完消息后发送确认回执,RabbitMQ只有在收到确认后才会从队列中移除消息。如果消费者在处理过程中失败,未确认的消息将重新入队,供其他消费者处理。
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!