• 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的


    大家好,又见面了~

    kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用。

    如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消费者组(Consumer Group)去消费topic即可

    如果加个条件,对同一个对象的操作请求必须要严格按照顺序进行处理呢? 答案也不难,topic分片之后,生产者定制分发策略,保证同一对象的操作请求都分发到同一个分片中,这样每个消费者就都是在按照顺序消费各自分片中的数据啦~

    如果再加上一些条件:

    • 这个消费者消费速度极慢、慢到需要100ms才能处理完一条消息,即使topic分100片也不满足不了要求;
    • 每个对象的操作请求数量存在严重倾斜的现象,有的分片消息数量很大,有的分片消息量很少,可能有的分片一直积压、有的分片却很闲;
    • 请求操作很重要,需要确保每条请求都被可靠消费,要保证事务最终一致性;
    • 数十年的老系统,业务错综复杂,项目方不允许涉及业务逻辑以及整体架构的大改...

    当上述各种条件叠加到一起,要求将消费性能明显提升,如果是你,会如何破局呢

    前段时间,应业务部门的要求,给他们的一个线上历史系统做个并发性能提升的方案,就遇到了上述各种要求叠加在一起的棘手情况。

    先简单说下遇到的业务场景:

    一个互动类的论坛的帖子评论处理场景,要求每个帖子的评论请求操作都必须要严格遵循一定的顺序(比如可能会有评论删除、引用评论、回复评论等操作,所以请求顺序必须要严格按照顺序处理),帖子评论的操作请求发送到kafka里面,然后评论服务消费kafka处理各个请求,这个评论消费者服务消费太慢,需要提升下并发效率。

    增加分片与消费者数量

    正式开始着手去整改优化。

    首先是常规调整根据kafka自身的机制,将topic进行分片调整,拆分为N个分片,然后增设消费者组,在消费者组内部署与分片数相等的消费者服务节点,这样每个消费者可以处理一个分片,这样整个评论的消费性能就会提升N倍。

    那么,这里为什么要强调消费者组里的服务节点数要等于topic分片数呢?这里提一下kafka中Consumer Group中消费者数量与topic分片数之间的相关逻辑。

    看一下不同的消费者数量与topic分片数对应的处理消费场景:

    所以说,消费者组里面的消费者数量并不是越多越好,而是受到了topic的分片数量的限制的:

    • 消费者数量太少,会导致一个消费者需要消费多个分片的数据,造成某一个消费者消费压力提升;
    • 消费者数量太多,会导致有的消费者并不会消费任何数据,浪费部署资源。

    也是基于这一点,上述我们的方案中,规划消费者组里的消费者数量与topic的分片数一致,这样可以保证每个消费者消费1个分片,达到最大效率协调。

    再补充个知识点:为什么kafka要限制每个分片最多只能有1个消费者组里的消费者在处理呢

    因为消费者拉取消息需要提供offset, limit。如果offset放在broker端,那么一定会产生额外的通信开销;如果offset放在Consumer端,如果在一个组有多个消费者,就需要有一个协调者,集中式的管理,解决锁冲突,如果不解决冲突,那么势必会产生重复消费、无用的消费,从而导致资源浪费。 所以说,从性能与复杂度的取舍上,Kafka采用了相对简单的一种解决策略。

    保证分片内写入顺序

    通过上一章的方式,增加了topic分区数以及消费者组中消费者数量,对kafka中消息并行消费的效率是提升了,但是问题又来了:顺序问题!

    前面说过,由于业务明确要求确保顺序消费,而kafka只是保证分片内的消费顺序是固定的,但是不同分片之间的消费顺序是无法保证的。

    对业务进行分析发现,业务要求的顺序处理,其实是有条件的顺序处理。即对于同一个帖子的所有评论相关的操作必须要同步处理,对于不同帖子的评论相关操作并没有顺序的要求。那么问题就简单了,只要保证同一个帖子的所有评论相关操作请求都被分发到同一个topic分区内即可

    生产者写入消息到kafka的topic时,kafka将依据不同的策略将数据分配到不同的分区中:

    1. 轮询分区策略
    2. 随机分区策略
    3. 按key分区分配策略
    4. 自定义分区策略

    这里采用自定义分区策略,因为每个评论操作请求中都携带有一个原始帖子ID字段,所以分发策略也很简单,直接帖子ID % 分片数将消息进行分发,这样同一个帖子ID的评论操作就都可以到同一个分片中,这样顺序的问题就解决了。

    所以,对上一环节给定的初步方案进行优化,补充下生产者端的定制化分发策略的要求,保证同一个帖子的评论操作都会到同一个Topic分片中:

    方案设计到这里,似乎已经是解决了并发消费的问题了。但是后来实际压测之后,结果令人大跌眼镜。

    单消费者速度提升

    按照前面给出的方案,部署了DEMO环境进行压测(拆分成4个分片,部署4个消费者),最终发现集群消费速度的确是翻了4倍、但是整体并发量依旧是低的可怜,4台机器最终消费并发量甚至不到100!

    心灵受到暴击之后,去分析下单个消费者节点的运行情况,发现压测过程中整个机器CPU、IO、MEM、线程数都非常低、毫无任何波动。问业务方要了代码权限,下载了代码并走读了一遍Consumer服务的代码逻辑才发现其中玄机。

    其实该业务整体交互逻辑其实很简单,从kafka获取一个消息,然后进行消费。但是这个消费逻辑,是需要按顺序调用10余个周边系统的HTTP接口! 这也难怪CPU、内存、IO都非常低了,整个进程中只有一个线程在处理业务、而这个线程大部分时间都是处于IO等待状态。

    所以要想提升整体集群的消费能力,要么无限扩机器、要么就提升单节点的消费能力 —— 显然前者是不可能的,只能选择后者。而对于单线程、多IO操作的场景,提升并发性能,首先想到的就是改为多线程并发处理。但是多线程并发的时候,又会涉及到如何保证顺序消费的问题。

    对前面的方案进行优化,给出如下方案:

    在前面方案的基础上,主要是对消费者端的实现逻辑进行了调整:

    • 在消费者内部,区分Consumer ThreadWork ThreadConsumer Thread负责从kafka拉取消息,而Work Thread负责真正的消费逻辑处理。
    • 单机内存中维护若干个队列,每个队列对应一个Work Thread,负责消费该队列中的数据;
    • Consumer Thread基于亲缘性分发策略对消息进行二次分发,保证相同帖子ID的请求分发到相同的内部队列中。

    再进行压测,设置单个消费者服务Work Thread数量为100,集群内4个消费者服务,整体消费速度达到了7000。单节点的消费性能从原来的20提升到1700,提升了近80倍!

    如何保证消息不丢失

    经过将单机的消费模式改为多线程的方式,目前并发消费性能的问题是解决了,可是可靠性的问题又出现了。

    原先的时候,消费者从kafka拉取一条消息,然后消费完成后,给kafka一个ack应答,然后去拉取下一条消息,这样即使消费者中途宕机了,kafka依旧可以将消息分发给下一个可用的消费者去处理,可以保证请求消息不会丢失掉。

    而前面的方案,消费者服务从kafka拉取到消息之后,并没有等待处理完成,就继续从kafka拉取消息然后缓存在本机内存中等待work thread慢慢消费,这个时候,如果机器宕机,所有缓存的消息将全部丢失

    为了解决上述问题,考虑将kafka应答机制改为手动提交ack。但是由于多个线程之间乱序的处理kafka上的数据,各个线程已经处理的offset值是不一样的。如下示意图:

    为了保证消息可靠不丢失,采用如下策略:定期手动提交当前的offset信息,提交的offset值,选择当前节点已处理的最小offset值(对于上面示意图,即提交1002这个offset值),可以通过在内存中缓存下处理的offset列表的方式实现,如下如实现策略:

    正常情况下,提交的offset值不会有什么作用或影响,但是一旦出现异常情况,导致当前节点进程不可用,kafka重平衡将当前分片分给另一个消费者进行消费的时候,另一个消费者会从最后一次提交的offset位置开始继续往后消费。这样便解决了数据丢失的问题,保证了数据可靠。

    但是,另一个问题又出现了:重复消费。好在,虽然这个业务系统是十多年前构建的,但是至少分布式消费者该有的一个关键特性还是具备的,那就是幂等,所以这个问题就不用考虑了。

    数据积压不可控场景兜底

    到这里,总该一切都没问题了吧?

    是,也不是。正常情况下是没问题了,但是作为一个"核心"系统,极端的异常情况的保命策略还需要考虑下。

    举个例子,如果突然有一条帖子爆火,这条帖子的评论量远超其余帖子的评论量,甚至远超整个系统的额定最大负载请求量,这样会出现个问题:

    • kafka某一个分片数据量积压严重,其余分片很空闲
    • 该条火爆的帖子的相关评论请求,阻塞了与该帖子分配到同一分区的其余帖子的评论处理。

    这个原计划做一个动态伸缩的分片分发策略,但考虑到此场景过于极端,当前系统实施起来性价比不高,所以本着适当设计的原则,放弃了原先方案,改为了简单的手动处理 + 补偿服务方式,如下:

    一旦出现未预料到的异常,导致系统积压已经超过正常的处理范围了,且已经远超系统可以正常恢复的限度,为了保证现有业务尽快的恢复正常,可以先跳过积压的请求,先保证新过来的请求正常被处理,然后启动补偿进程,慢慢消费之前积压的消息。

    有一说一:

    这个地方是整个方案里面我自己不太满意的一个实现,属于迁就现实的一种妥协方案,写这篇文档的时候,自己还是打算近期将这部分按照一个更优的方案进行实现。如果您也有兴趣了解或者有更好的建议思路,欢迎联系我,我们一起掰扯下。

    总结梳理

    至此呢,为了解决kafka消费者消费能力太慢场景的集群并发性能提升方案就全部设计完成了,业务要求的各种要求约束也都可以满足了,最终实现了在业务逻辑没有变的情况下,整体集群的性能提升了上百倍。整体的改动内容如下:

    回顾

    身为架构师的这些年中,做过很多个从0构建的大型项目的整体架构方案,也给很多业务部门针对具体问题出过一些解决策略。过程中一直在反复思考一个问题,面向实际业务问题场景的架构本质是什么

    面向问题的架构面向业务或者系统的架构不同点在于:前者的诉求很明确、目的也比较单一、且现实约束会比较多、可发挥的余地有限。而架构师需要做的,就是在有限的范围内,围绕一个既定目标、确定一个主线策略、再针对主线可能存在的弊端或不足进行弥补与调和,最终促成整个架构方案的最终落地并达成既定目标

    这不是秋招快到了吗?我特地给各位小伙伴准备了近两年的大厂面试题!面经!有需要的小伙伴可在我工众号【不脱发有志青年】中免费货区1

     

  • 相关阅读:
    python学习笔记3-dictionary和分词
    2024.4.24
    VS 配置 OpenCV (亲测可用)
    有什么免费python安装包?
    24. [Python GUI] PyQt5中的模型与视图框架-表格部件QTableWidget
    dart中final和const的区别
    第42节——路由知识额外扩展
    CentOS8 ModelScope的安装与使用
    kotlin coroutine源码解析之Dispatchers协程调度器
    Centos Sendmail
  • 原文地址:https://blog.csdn.net/Trouvailless/article/details/126049867