「 Kafka 」 作为我们开发中常常选取采用的消息中间件,它的使用我们一定或多或少的有所了解。可是你真的了解它的常用名词,比如 「 brocker 」、「 consumer 」 或者说 「 producer 」 等等一系列的名词含义吗?
不了解没有关系,接下来我们来简单了解一下这些小知识点,相信对你学习掌握 「 Kafka 」 这套优秀的消息中间件,一定会有很大的帮助。
并且在后续还会有 「 Kafka 」 一些常见的问题阐述,以及我们如何解决这些问或者说如何避免这些问题。
在介绍这些名词之前,我们先从宏观角度看一下,「 Kafka 」 系统中所常用的名词都有哪些,了解了这些,我们在针对某个名词做一一解释,逐个击破。
如下图所示,我们所介绍的kafka名词下图包圆了:
对kafka常见名词有了一个整体的概念,我们还缺少一样东西,我们不能够对名词直接介绍,还需要对他们的整体结构做一定的了解。不然就犹如是空中楼阁,虚无缥缈的东西,容易让人云里雾里。
下图,就是一个kafka系统所大致包含的一些结构,当然了,此图并不会面面俱到,只是针对全局做一些了解。
kafka 架构简图:
Topic(主题)和 Partition(分区)
「 topic 」主题,「 Kafka 」按照「 topic 」 对消息进行分类,从而来决定将消息放到那个 「 topic 」下。比如用户消息,我们可以发送到名为 「 user_message 」 的 「 topic 」 上,类似的支付消息我们可以发送到 「 pay_message 」 的 「 topic **」**上面。
这里值得注意的是,topic 主题概念是逻辑上的概念,是为了方便消费者订阅以及生产的的消息发布,实际上消息在结构中是按照 「partition」(分区)存在的。
「partition」 是物理上的概念,它也是真正存放消息的地方,同一个 「 topic 」 中可能存在一个或多个分区。然后每一个分区的消息是有序的。
Broker、Leader 和 Follower
「 Broker 」 解释起来就是一个单独的 「 Kafka 」 节点,一个 「 Kafka 」集群是由一个或者多个 「 Broker 」 节点所组成的。
我们都知道在分布式系统中会有单节点故障问题。所以 「 Kafka 」 针对于这个问题,将数据同步到了多个节点中,这样就算一个节点出现故障,也能够立即启用备份好数据的节点,重新启用。
所以针对于多 「 Broker 」 的情况,需要去选择出一个主要使用的节点,其他节点变成备份节点,这就是 「 Kafka 」集群节点的选举机制。在此选举机制的引导下,「 Kafka 」的多节点将演变为一主多从的情况。
一主在 「 Kafka 」 中被称为 「 Leader 」 (领导者) ,多从称为 「 Follower 」 (跟随者) 。他们之间的数据是由主节点所同步的。
在实际工作使用中,所有的消息操作都是针对于 「 Leader 」 主节点进行操作的,这样能够提升消息处理的速度。
其次为了保证高可用,每个分区都会有一定数量的副本(replica) 。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。
Producer(生产者) 、 Consumer(消费者)和 Consumer Group(消费者组)
「 Producer 」生产者顾名思义是「 Kafka 」 消息的实际的推送者,由此组件将消息发送到对应 「 topic 」的分区上。
「 Consumer 」消费者是去消费指定 「 topic 」 下的某个分区中的消息,消费完成后,将这一次消息偏移量 「 offset 」 记录下来,方便下一次读取能快速确定消费位置。
那么Consumer Group 消费者组又是什么呢?
单个消费者消费消息并且还要处理消息的情况下,消费速度是达不到我们的预期速度的。「 Kafka 」 中为我们提供了消费组的新概念来帮助我们将消费速度达到预期速率。
同一个消费组中的多个消费者,每个消费者所消费的分区都是不相同的。同一个分区对于一个消费组来说,只能够有一个消费者来消费。
对于消费组中的某一个消费者来说,可以消费一个或者多个分区。这样才能保证消息不会被重复消费。
消费组里面加入新的消费者或者有消费者下线了,都会触发 「 重平衡 」。
请记住这个名字,它是 「 Kafka 」 最大的诟病,这个问题我们下面会在 「 Kafka 」 问题中详细解释以及抛出解决问题。
在使用 「 Kafka 」 的过程中,也会产生各种各样的坑。下面我们着重来讲解一下我在使用 kafka 的过程中所遇到的问题,以及遇到相应的问题是如何解决的。
问题背景:
使用过 「 Kafka 」 的都应该知道,「 Kafka 」中消息 ** 被消费之后是默认会自动提交 「 offset 」 (偏移量)的。
但是之前我们有个需求是,消费者在消费完消息之后不能够自动提交 「 offset 」,需要等待其他异步业务的完成,根据返回结果或者通知来将消息偏移量 「 offset 」 提交。
这就会有一个问题,对于同一个消费者,有可能存在多个线程同时在拉取「pull」和提交「ack」操作。这时 「 Kafka 」 就百分之百会抛出多线程修改异常。
业务场景提交流程如下图所示:
抛出异常如下述代码所示:
- throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
- 复制代码
问题溯源:
我们知道了问题的表象,那么我们还需要去知道 「 Kafka 」 为什么会报错,这样我们才能够针对这个问题去给出相应的解决方法。
我们先来看一下 「 Kafka 」 拉取消息和提交消息的 api 内部都具体做了什么!
相应kafka内部源码如下:
- // 拉取消息
- public ConsumerRecords
poll(long timeout) { - acquireAndEnsureOpen();
- try {
- ......
- } finally {
- release();
- }
- }
- // 提交偏移量
- public void commitSync() {
- acquireAndEnsureOpen();
- try {
- coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);
- } finally {
- release();
- }
- }
- 复制代码
可以看出,不论是拉取消息还是提交偏移量都会调用两个相同的方法,分别是acquireAndEnsureOpen
和 release
方法 。那么具体来看一下两个方法中的具体实现,相信可以发现一些有趣的东西。
- private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
-
- private void acquireAndEnsureOpen() {
- acquire();
- if (this.closed) {
- release();
- throw new IllegalStateException("This consumer has already been closed.");
- }
- }
- private void acquire() {
- long threadId = Thread.currentThread().getId();
- if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
- throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
- refcount.incrementAndGet();
- }
-
- private void release() {
- if (refcount.decrementAndGet() == 0)
- currentThread.set(NO_CURRENT_THREAD);
- }
- 复制代码
我们可以看到,我们遇到的错误存在于acquire
方法的判断中,当判断为true
就会抛出相应的多线程修改异常,那么这个判断条件我们就需要去好好研究一番了。
可以看出消费者中的currentThread
(当前线程) 变量是 (原子类型) 默认值是 NO_CURRENT_THREAD【-1】 (没有线程状态) ,不论是拉取消息还是提交偏移量,都会去判断当前操作的线程 id 是否和当前消费者所占用的线程 id 保持一致。
不一致就会去原子设置compareAndSet
消费者占用线程为当前线程,设置交换成功正常执行,失败后,就会抛出我们之前所见到的多线程修改异常了。
看到这里我们发现,好像kafka消费者只支持单个线程操作啊!!
那么我们的异步多线程提交偏移量需求怎么实现呢?
别急,我们上述除了acquire方法之外,在代码的 finally代码块中我们可以看出都调用了 release
方法,从名字来说 release
是释放的意思。
可以理解为当前线程业务执行完成后,会将消费者的占用线程置为初始值 NO_CURRENT_THREAD【-1】 ,这样就可以正常的支持其他线程再次操作当前消费者。
问题解决:
知道了问题的原因,本质上是因为,有多个线程同时去操作同一个消费者对象,导致底层原子设置失败就会抛出对应异常。
那么问题就回到了,怎么保证多个线程对同一个消费者对象操作能够按照先后顺序来执行?
我们很容易想到的两种可以将多个线程变成顺序执行的方法,分别是:
加同步锁:
第一种方法用加同步锁的方式可以实现,可以做到多个线程下单线程执行。
但是加锁方式有一定的缺点,有先后乱序的问题,因为加锁和唤醒都是非公平状态,最终可以保证单线程执行,但是无法确保等待线程唤醒之后的执行顺序。有顺序问题,所以一般不采取此种方式去做处理。
消息入队列:
第二种通过队列的方式,多个线程的业务消息,全部放入一个队列中,然后队列另一端由单个线程去做消费处理。这样能确保多条消息最终是单线程处理,就可以避免并发修改问题。
同时对比于加锁方式来说,解决了加锁后所产生的的顺序不一致问题。
问题二:kafka 重平衡(rebanlence)
「rebalance」(重平衡)在我们使用 「 Kafka 」组件的过程中是非常常见的问题。它其实就是重新进行「partition」 (分区) 的分配,从而使得 「partition」 的分配重新达到平衡状态。
问题背景:
当我们针对 「 Kafka 」 消费者做了一定配置时,比如我们每次处理要拉取几百条消息数据,并且我们消息处理时间很长,就很容易触发 「 Kafka 」 针对于我们当前消费者组的 「rebalance」,重新就会对我们的消费组重分配分区。
问题溯源:
那么由上述我们知道当消息处理过于慢,会导致 「 Kafka 」 产生 「rebalance」 问题。
那到底什么情况下会出现呢?
根据使用经验以及参考 「 Kafka 」 的官方文档可以知道 , 消费组内的消费者共同消费一个 「topic 」 下的消息。
而当消费组内成员个数发生变化,例如某个消费者离开,或者新消费者加入,都会导致消费组内成员个数发生变化,从而导致重平衡。
以下三种情况都是组内成员变化的情况:
对于 「新成员加入」、 「组成员主动离开」 都是我们主动触发的,能比较好地控制。但是**「组成员崩溃」**则是我们预料不到的,遇到问题的时候也比较不好排查。
但对于 「组成员崩溃」 也是有一些通用的排查思路的,下面我们就来聊聊 「rebalance」 问题的处理思路。
要学会处理 rebalance 问题,我们需要先搞清楚 「 Kafka 」 消费者配置的四个参数:
「心跳时间间隔」heartbeat.interval.ms:
「heartbeat.interval.ms」 配置表示 consumer 向kafka 的 broker发送心跳的时间间隔,需要定期向 brocker 发送心跳的原因是告知 brocker 自己当前消费组还是存活着。
「heartbeat.interval.ms = 60000」 表示 consumer 每 60 秒向 broker 发送一次心跳。
「心跳超时时间」session.timeout.ms :
「session.timeout.ms」 表示 consumer 向 broker 发送心跳的超时时间。表示 brocker 所能够接受 consumer 发送心跳消息的最大间隔时间,超过这个最大时间,会产生我们上述的 「rebalance」 问题。
「session.timeout.ms = 180000」 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会进行 rebalance。
一般来说,「session.timeout.ms」 的值是 「heartbeat.interval.ms」值的 3 倍以上。
「消费最大的处理时间」max.poll.interval.ms:
「max.poll.interval.ms」 表示 consumer 每两次消息拉取的最大时间间隔。简单可以理解为,从kafka 拉取消息之后,处理这批消息所能占用的最大时长。
如果说,消费处理消息过程时间耗费很长,那么就需要相应的增加消息的最大处理时间。否则,当最大处理时间到达之后,还是没有完成消息处理,没有进行新一轮的拉取操作,那么 broker 就认为该 consumer 死亡了,会进行 rebalance。
「消费最大的拉取消息数」max.poll.records:
「max.poll.records」 表示 consumer 每次消息拉取的最大条数。 获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 「max.poll.interval.ms」 设置的时间内能消费完,否则会发生 rebalance。
简单来说,会导致崩溃的几个点是:
问题解决:
想要解决这个问题,主要就是针对于上述心跳超时和消息处理时间过长做相应的处理。
心跳超时问题:
对于心跳超时来说,我们需要调整 「session.timeout.ms」 和 「heartbeat.interval.ms」 参数,使得消费者与协调者能保持心跳。
一般来说,超时时间应该是心跳间隔的 3 倍时间。即 「session.timeout.ms」 如果设置为 180 秒,那么 「heartbeat.interval.ms」 最多设置为 60 秒。
为什么要这么设置超时时间应该是心跳间隔的 3 倍时间?
因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。
阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
消息处理时间过长:
对于消息处理时间过长问题,对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值) 。
除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。
本文总结
有了这些精美的流程图和讲解,我相信大家对 kafka 有了一个全新的认识。