为什么使用消息队列?
1. 异步
有些业务场景对实时性要求没有那么高,可以通过消息队列实现业务处理上的异步化,提高系统的并发处理能力。
2. 解耦
如果A、B、C均关注D的数据变动,如果采用推送的方式,我们需要逐个推送到A、B、C,这些系统过于依赖D的推送,当D服务出现异常,也会直接影响A、B、C。与此同时,如果C不再订阅D的变动,或者新增E订阅D的变动,采用推送的方式都会有一定的局限性,不够灵活。而消息队列可以完成业务上的解耦,D不再关注需要给谁推送,而是生产出消息即可,谁关注谁就订阅消息,实现了解耦合。
3. 削峰填谷
消息队列,特别是pull模式下的消息队列,可以由消费方自己控制业务处理的速度,数据在中间件中存储,也不会丢失,处理能力强就频繁pull,处理能力差就慢点pull,可以将蜂拥的请求异步化处理,达到防洪的效果。
消息队列带来的问题?
1. 系统数据的一致性
数据的一致性不再强实时,而是弱实时或最终一致
2. 系统复杂度变大
系统内多了一个MQ,维护起来复杂度变高
3. 重复消费、重入考虑
很多系统可以通过持久化解决消息丢失的问题,但不可避免的出现重复消息的问题,业务设计上需要考虑全面重入的处理。
几种常见的消息队列技术选型
Kafka的高可用
1. 数据冗余备份
Kafka的topic下分多个partition,partition之间的数据是没有交集的(图1)。每个partition在多个实例上均存储备份数据(图2)
2. leader节点靠选举产生
对于指定的partition的所有备份中只有一个是leader,生产者和消费者均与leader直接交互,而不是任意的副本,非leader节点通过数据同步的方式形成副本。leader节点挂掉了,还可以重新选举,选择新的leader,这个leader要求数据与原leader完全同步,可以接替挂掉的节点继续工作。
如何解决重复消费问题?
先来说说为何会产生重复消费问题,Kafka采用offset记录消费者的数据处理进度,在单个topic的单个partition中,数据是有序的。因此,对partition会采用一个偏移量记录数据处理的进度,进而防止重复消费。当时偏移量是需要更新的,偏移量的更新依赖消费者的提交,消费者提交嘞偏移量意味着数据已经处理完毕,但如果数据已经处理完毕,但消息提交尚未完成时就挂掉了,此时会造成偏移量没有更新,但是数据已经处理的问题。
那么如何解决呢?
1. 利用数据库主键的唯一性。
在消费方记录消息的唯一键,当数据处理成功就会存储一条数据,这样,即便是消息再次处理,由于已经存在相同的数据,数据库会帮我们解决重复的问题。
2. 利用业务上的不可逆解决
通过状态机、业务数据的唯一性,来确保消息消费的唯一性,避免重复处理。
如何确保消息的可靠传输?
1. 消费方
消费方通过手动提交偏移量的方式,解决自动提交偏移量导致的数据丢失问题,用重复消费换避免消息丢失。
2. 生产方
先做持久化,写入数据库,再通过异步确认结果的方式发送消息。
3. kafka
数据配置两个以上的副本(replication.factor);
存在一个以上的follower(min.insync.replicas);
所有副本写入后才算提交成功(acks=all);
生产方失败重试(retries=MAX);
如何避免消息积压?
1. 新增消费者,增加并行处理能力
并行处理能力也是有限制的,当消费者数量大于partition的数量时,新增消费者是无效的
2. 提高单条数据的处理速度
消息积压本质上是处理速度的问题,对单个消息处理采用fail-fast的思路实现,可以尽可能快速地判断消息的有效性等。
3. 增大批处理能力
单次pull一条数据和批量pull数据的IO此时不一样,增大批处理能力可以减少网络IO,提高处理能力。