视频推荐:https://www.youtube.com/watch?v=-G0AsrOripo
引入MQ消息中间件,最直接的目的是:做系统解耦和流量控制。最基本的原因还是解决互联网系统的高可用和高性能问题。
带来的问题:
推荐文档:https://learn.lianglianglee.com/专栏/消息队列高手课/01 为什么需要消息队列?.md
通常来说,使用消息队列能为我们的系统带来下面三点好处:
将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。还是直接上图吧:
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。
另外,为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。
推荐文档:https://learn.lianglianglee.com/专栏/消息队列高手课/05 如何确保消息不会丢失.md)
由于网络中的数据传输是不可靠的,想要解决不丢失消息的问题,首先需要分析:1. 在传输消息的过程中哪些环节可能会丢失消息?2. 如何知道消息丢失了?最后才是如何确保消息不丢失。
就好比架构设计,架构体现了架构师的思维过程,而设计才是最后的解决方案。
最尴尬的不是丢失消息,而是丢失了消息你并不知道。
如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在生产端( Producer
),我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。
首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。
讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。
你可以看下这个图,一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。
在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect
,然后发送消息,如果消息没有成功被 Broker 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。
//伪代码
//开启事务
channel.txSelect
try{
//在这里发送消息
.........
}catch(Exception e){
//如果捕获到异常
channel.txRollBack;
}
//提交事务
channel.txCommit;
在生产者那里设置开启 Confirm
模式之后,生产者每次发送的消息都会分配一个唯一的 ID,如果消息成功写入 Broker 中,Broker 会给生产者回传一个 ack
消息,告诉你说这个消息 ok 了。
如果 Broker 没能力处理这个消息,会回调你的一个 nack
接口,告诉你这个消息接收失败,生产者可以进行消息的补偿机制。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发(补偿)。
需要注意的是:RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。Confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
开启RabbitMQ的持久化。**当生产者把消息成功写入RabbitMQ之后,RabbitMQ就把消息持久化到磁盘。结合上面的说到的Confirm机制,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败,生产者会重新发送。**存入磁盘的消息不会丢失,就算RabbitMQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。
持久化的配置:
Queue
的元数据,但是它是不会持久化 Queue
里的数据的。DeliveryMode
设置为 2
,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。注意:持久化要起作用必须同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 Queue
,恢复这个 Queue
里的数据。
如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。
对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType
配置为 SYNC_FLUSH
同步刷盘。
TODO:如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。
在集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的?
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
主要分析了一条消息从发送到消费整个流程中,消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。
你在理解了这几个阶段的原理后,如果再出现丢消息的情况,应该可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步深入分析,快速找到问题原因。
其实就是防止消费端重复消费消息的问题。也就是我消息发一次,你消费者只能消费一次。
在业务高峰期最容易产生消息重复消费问题,当Con(消费者)
消费完消息时,在给Pro(生产者)
返回 ACK
时由于网络中断,导致Pro(生产端)
未收到确认信息,该条消息就会(被我们的定时任务)重新发送,并被Con(消费者)
消费,但实际上该消费者已成功消费了该条消息,这就造成了重复消费。
实际上面的这个案例,是由于网络问题,导致了消息重发(补偿),出现了消息重复消费的问题。但是网络是人为不可控制的,谁也没有办法。实际上所有MQ的产品其实都没有提供主动解决消费幂等性的问题,所以就需要由消费者自行控制。
「幂等」是一个数学与计算机学概念,在数学中某一元运算为幂等时,其作用在任一元素两次后会和其作用一次的结果相同。
也就是说,请求多次执行和一次执行的结果或者影响是一样的。
推荐文章:https://juejin.cn/post/6906290538761158670
# 消息被生产者生产之后,将消息的的信息,保存一份到我们的数据库中,保存的时候根据机制,先去查询。
# 如果 COUNT(1) > 0 ,说明消息被消费过了。否则 COUNT(1) = 0 ,说明消息没有被消费,直接insert操作
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指纹码;
好处就是实现简单,但是在高并发下会产生数据的写入的性能瓶颈,解决方案:根绝ID进行分库分表的策略,然后根据路由算法(Hash算法等),去做分压分流的这么一个机制。
实际上就是:把单库的幂等,利用的ID的分库分表策略,变成多库的幂等。这样就可以分摊数据的访问的流量的压力,保障我们数据库可以抗的住这个流量就可以了。
需要注意的是:
这里只提用Redis的原子性去解决MQ幂等性重复消费的问题。
不管是方案一还是方案二,前提条件是要有一个全局生成唯一ID的技术。
另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
比如,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。
比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:
这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。
对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。
推荐文章:https://pdai.tech/md/arch/arch-z-id.html
如果你想同时满足简单、高可用和高性能,就要有有取舍,所以你要站在实际的业务中,说明你业务考虑的平衡点是什么。
主要介绍了通过幂等消费来解决消息重复的问题,然后我重点讲了几种实现幂等操作的方法,你可以利用数据库的约束来防止重复更新数据,也可以为数据更新设置一次性的前置条件,来防止重复消息,如果这两种方法都不适用于你的场景,还可以用“记录并检查操作”的方式来保证幂等,这种方法适用范围最广,但是实现难度和复杂度也比较高,一般不推荐使用。
这些实现幂等的方法,不仅可以用于解决重复消息的问题,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题。比如,我们可以将 HTTP 服务设计成幂等的,解决前端或者 APP 重复提交表单数据的问题;也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的重复调用问题。这些方法都是通用的,希望你能做到触类旁通,举一反三。
https://cloud.tencent.com/developer/article/1844222
思考过程:
如果出现积压,那一定是性能问题,想要解决消息从生产者到消费者上的性能问题,就首先要知道哪些环节可能出现消息积压,然后再考虑解决。
因为消息在发送之后,才会出现积压的问题,显然主要不是生产端的问题。又因为绝大部分消息队列单节点,都能达到每秒几万的处理能力,相对于业务逻辑来说,问题并不会在消息队列的存储上面。毫无疑问,出现问题只可能在消费端。
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。
要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
如果是线上突发问题,就要临时扩容,增加消费者的数量,与此同时,还可以降级一些非核心业务,通过扩容和降级来扛住流量。
这点表明你应对线上问题的应急处理能力。
其次才是,排查解决异常问题,比如通过查看监控,日志等分析手段分析是否消费端的业务逻辑代码出现了问题,进而优化消费端的业务处理逻辑代码。
最后 ,如果是消费端的处理能力不足,可以通过水平扩容,来提高消费端消费消息的并发能力。
**在扩容消费者的实例数的同时,必须同步扩容主题(Topic
)中的分区(artition
)(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。**如果Consumer的实例数量超过分区数量,这样的扩容是无效的。原因是:因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
很多消费程序,他们是这样来解决消费慢的问题的:
它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。
这个方法是不是很完美地实现了并发消费?请注意,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。
还有一种消息处理积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。
导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,我们排查消息积压原因,是有一些相对固定而且比较有效的方法的。
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。
我们主要讨论了 2 个问题,一个是如何在消息队列的收发两端优化系统性能,提前预防消息积压。另外一个问题是,当系统发生消息积压了之后,该如何处理。
优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。
对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。