同样地这篇文章主要学习胡夕老师的相关文章。其中的一些截图也是来自于它。
消费组(Consumer Group
)是Kafka
提供的可拓展并且具有容错性的消费者机制。 有三个特性:
Consumer
实例。Group ID
,是一个字符串。订阅
了一个主题的单个分区
,同时该分区也只能被同一个消费组下的一个实例消费。
因为消费组机制,Kafka
可以同时实现了传统的消息队列模型和发布订阅模型:
Group
。一条消息只能够被一个Consumer
消费。Group
。即允许消息被多个Consumer
消费Rebalance
本质上是一种协议,规定了一个 Consumer Group
下的所有 Consumer
如何达成一致,来分配订阅 Topic
的每个分区。消费组发生Rebalance
的触发条件有三种:
Rebalance
。Rebalance
。Rebalance
其实也就是重新分配分区,让其均匀分布在各个Broker
上。这是其一个重要的功能,但是它也有很多缺点,例如:
Rebalance
过程中,所有 Consumer
实例都会停止消费,等待 Rebalance
完成。Rebalance
过程中,所有 Consumer
实例共同参与,在协调者组件Coordinator
的帮助下,全部重新分配所有分区。前者好理解,后者其实并不需要将所有的实例和分区之间的分配关系给打乱重排。因为我们从上一章Kafka复习计划 - 客户端实践及原理(连接器/TCP的管理/幂等性和事务)知道,实例必须和分区里副本所在的Broker
建立TCP
连接,才可以进行后续的相关操作。那么将所有实例全部重新分配分区,可能就会造成不必要的TCP
连接的创建过程。
诸如以上缺点,Rebalance
过程所消耗的时间是很久的。因此,能避免就去避免Rebalance
的发生。也就是避免触发Rebalance
的三种情况。
在将如何避免之前,先来说下上文提到的Coordinator
协调者。
Coordinator
专门为消费者组服务:负责为其发生重平衡Rebalance
的时候提供位移管理和组成员管理。 比如:
Consumer
端在提交位移的时候,就是向Coordinator
所在的Broker
提交位移。Consumer
应用启动的时候,也是向Coordinator
所在的Broker
发送各种请求。由Coordinator
负责执行消费者组的注册、成员管理记录等元数据的管理。每个Broker
都有自己的Coordinator
组件。而消费者组确定自身对应的Coordinator
在哪一台Broker
上,有两个步骤:
Leader
副本所在的Broker
,即为对应的Coordinator
。Coordinator
协调者的算法流程:
假设某个Group ID
为“test-group
”,然后计算到它的hashCode
值。
计算__consumer_offsets
位移主题的分区数(默认50个),然后用哈希值对分区数做取模运算。
此时意味着位移主题的分区12负责保存该Group
的数据。
找到位移主题分区12的Leader
副本所在的Broker
。
首先我们需要知道Rebalance
后会发生什么事情:每个 Consumer
实例都会定期地向 Coordinator
发送心跳请求,表明它还存活着。
而关于心跳请求以及认定Consumer
是否挂掉的过程,这里有三个重要的参数至关重要:
session.timeout.ms
:默认10s
,意思是,如果10s
内没有收到心跳请求,就会认为这个实例挂了。其决定了 Consumer
存活性的时间间隔heartbeat.interval.ms
:意思是Consumer
发送心跳请求的频率大小。max.poll.interval.ms
:用于控制Consumer
实际消费能力对Rebalance
的影响。默认是5分钟
,即Consumer
程序如果在5分钟之内无法消费完poll()
方法返回的消息,那么Consumer
会主动发起离开组的请求,那么自然而然会导致一次Rebalance
的发生。其次上文提到了触发Rebalance
的三个时机:组成员数量、分区主题数量、主题的分区数量发生变化。这里来重点说下组成员数量变化的方面。有些情况下,Consumer
实例会被Coordinator
错误地认为它已经停止了,而被踢出Group
,那么就会导致Rebalance
的发生。
那么结合心跳请求的机制,我们可以看出,这一块是一个避免消费者组发生重平衡的一个切入点。我们可以明确哪一些Rebalance
操作是不必要发生的。
1.Consumer
未能及时发送心跳导致被踢出消费者组。 对于这样的现象,可以规定Consumer在被判定为 挂掉 之前,至少发送3轮的心跳请求。即session.timeout.ms >= 3 * heartbeat.interval.ms
,例如前者设置6s,后者设置2s。
2.Consumer
消费时间过长导致主动发送离开消费者组请求。 将max.poll.interval.ms
设置的阈值稍微大一点,可以做个统计,Consumer
端程序运行的最大时长是多少,如果是10分钟,可以将这个值改为11分钟。
当Rebalance
发生之后,由协调者决定发起,并且指挥其他的Consumer
实例参与重平衡。而指挥的这一个操作则是通过心跳机制来完成。
REBALANCE_IN_PROGRESS
封装到心跳请求中,发送给消费者实例。REBALANCE_IN_PROGRESS
,就知道此时重平衡开始了。消费者组在重平衡阶段,一共有5个状态:
而这五个状态之间的关系如下:
从消费者端角度来看,重平衡分为两个步骤:
加入组:会向协调者发送JoinGroup
请求,主要告知协调者自己订阅了什么主题,这样协调者就可以收集到所有成员的订阅信息。
等待领导者消费者分配具体的方案:普通成员和消费者领导者都会向协调者发送SyncGroup
请求。普通成员发送的请求体没有实际的内容,领导者发送的请求体则包含做好的分配方案。
注意:
JoinGroup
请求的成员将自动成为消费者领导者。它的任务就是收集所有成员的订阅信息,然后根据这些信息去制定具体的分区消费分配方案。SyncGroup
请求,同样地协调者也会以SyncGroup
请求的方式返回给各个成员,告知他们重平衡的最终结果(分配方案)。所有成员接收成功后,消费者组进入到Stable
状态。场景一:新成员入组。当协调者接收到新的 JoinGroup
请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
场景二:组成员主动离开组。即Consumer
端实例主动close()
了,也就是向协调者发送LeaveGroup
请求。
场景三:组成员崩溃离组。
注意区分主动离组:
Kafka
中有一个内部主题:__consumer_offsets
,叫位移主题,用作位移数据的管理,即保存消费者的位移消息。 当Kafka
集群中的第一个 Consumer
程序启动时,Kafka
会自动创建位移主题。其中相关的主题参数为:
offsets.topic.num.partitions
,默认50。offsets.topic.replication.factor
,默认3。Kafka
的位移管理机制用一句话表示就是:将Consumer
的位移数据作为一条普通的Kafka
消息提交到主题__consumer_offsets
中。
注意:
Kafka
主题,可以手动创建、删除、修改。但一般我们无需去管理它。Kafka
自定义的,用户不能够随意修改。这里我们可以来验证下:(我的Kafka
和Zookeeper
都是Docker
上的)
Zookeeper
的容器ID
,并进入其中:docker exec -it 0b1db99556e0 bash
到目录cd /opt/zookeeper-3.4.13/bin/
中,使用命令./zkCli.sh
启动Zookeeper
客户端。
查看对应主题下的分区:ls /brokers/topics/__consumer_offsets/partitions
,结果如下:确实是50个,[0,49]
消息格式简单可以理解为Key-Value
格式,Key
包含三个重要的内容:Group ID
,主题名,分区号。 Value
主要保存位移值以及元数据。另外,位移主题还会保存以下两种信息:
Consumer Group
信息的消息,用来注册消费者组的。Group
过期位移甚至是删除 Group
的消息。主要来说一下第三种消息,他也叫墓碑消息
,主要特点是它的消息体是null
,即空消息体。他主要是标明某个消费者组需要被删除。它的出现有两个前提:
Consumer Group
下的所有Consumer
实例都停止了。Consumer
实例的位移数据已经被删除。我们知道,Kafka
中Consumer
提交位移的方式有两种:由参数enable.auto.commit
来控制。
Consumer
在后台定期地提交位移,时间间隔由参数auto.commit.interval.ms
来控制。默认是5s。Consumer
端的程序来控制,consumer.commitSync()
。如果选择的是自动提交位移,那么只要Consumer
一直启动着,就会无期限地向位移主题中写入消息,而文章到这里为止讲的基本上都是主题__consumer_offsets
中有关信息的写入操作。那么相反的,Kafka
必定会考虑到这点:
Kafka
使用Compact
策略来删除位移主题中的过期消息,避免该主题无限期膨胀。 而Compact
策略的定义又简单易懂:
Key
(大前提),发送了两条消息,Msg1
和Msg2
。Msg1
的发送时间早于Msg2
,那么Msg1
就是过期消息。Kafka
提供了叫Log Cleaner
的后台线程取定期地巡检待 Compact
的主题,看看是否存在满足条件的可删除数据。Compact
过程的先后对比如下:
从用户角度来看,提交位移的方式有两种:
enable.auto.commit = false
,然后调用API
提交位移。首先来说下自动提交的问题:
poll()
方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。重复消费的一个情景:
M
之后的第三秒钟发生了Rebalance
操作。Rebalance
结束后,所以Consumer
将会从上一次提交的位移M
处继续消费。M
是3秒前的位移数据了,因此再Rebalance
发生的前3秒消费的数据全部要重新消费一次。也因此提供了手动提交的API
,可以让使用者自己控制提交的时机以及频率。从Consumer
端角度来看,手动提交位移的方式也有两种。
commitSync()
,会提交Consumer
端poll()
返回的最新位移。会阻塞。能够自动重试。commitAsync()
。立刻返回结果,不会阻塞。通过回调函数来做后续操作。不能够自动重试。可以看到同步和异步提交各有各的优缺点,那么我们可以将它们俩结合在一起。手动提交下推荐的位移提交编码方式如下:
Properties properties = new Properties();
properties.put("bootstrap.servers", "你的服务器地址:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-consumer");
// 1.创建KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 2.订阅主题
consumer.subscribe(Collections.singletonList("test"));
// 3.轮询消费
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 4.处理消息体逻辑
processMsg(records);
// 5.异步提交代码
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 6.先同步阻塞提交位移
consumer.commitSync();
} finally {
// 7.最后关闭
consumer.close();
}
}
优点如下:
commitAsync()
避免程序阻塞。Consumer
端程序关闭前,调用 commitSync()
方法执行同步阻塞式的位移提交,确保能够保存正确的位移数据。但是这段代码也有一个问题:poll()
方法是一批一批的返回消息的,而commit
提交位移的时候,是会将poll
返回的所有消息的位移都提交。 试想一下,倘若消息有1W条,那么就会一次性提交1W条数据的位移。如果中间出现问题了,那么之前处理的岂不是全部要重新来一遍?
因此我们可以从更细的粒度上出发去提交位移,比如可以每处理完100条数据就提交一次位移。我们以同步提交的API
为例,来看下这个方法的重载函数:
public void commitSync() {}
/**
* TopicPartition:消费的分区
* OffsetAndMetadata:位移数据
*/
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {}
案例代码如下(以每100条异步提交一次位移为例):
Properties properties = new Properties();
properties.put("bootstrap.servers", "你的服务器地址:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-consumer");
// 1.创建KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 2.订阅主题
consumer.subscribe(Collections.singletonList("test"));
// 3.轮询消费
try {
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 4.处理消息体逻辑
for (ConsumerRecord<String, String> record : records) {
// 处理逻辑....
System.out.println("topic = " + record.topic() +
", partition = " + record.partition() +
", offset = " + record.offset() +
", customer = " + record.key() +
", country = " + record.value());
// 将当前处理的消息放到位移集合中。届时一并提交其位移
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
// 5.异步提交代码
if (count != 0 && count % 100 == 0) {
consumer.commitAsync(offsets, null);
}
count++;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 6.先同步阻塞提交位移
consumer.commitSync();
} finally {
// 7.最后关闭
consumer.close();
}
}
实际上,无论是是手动提交还是自动提交,都不能避免消息的重复消费。只要Consumer
端满足以下两个条件,那么在下次重启之后依旧会出现消息的重复消费。
Consumer
消费了一定的数据。Consumer
还没提交位移。既然仅仅靠Kafka
的位移提交机制无法避免重复消费,我们可以从编码角度来解决:如果保证了消息的幂等性,那么就不会重复消费。大概做法如下:
Producer
端在发送消息的时候,往消息体里面塞一个唯一标识符。Consumer
端在消费消息的时候,可以先去Redis
(或者Mysql
这类数据库)观察下这个消息是否被消费过。若消费过,则跳过。Redis
中。Kafka
有一套自己的请求协议,用于实现各式各样的交互操作,例如:
PRODUCE
:生产消息用的。FETCH
:消费消息用的。METADATA
:请求Kafka
集群元数据信息用的。不过有一个相同点就是:所有的请求都是通过TCP
网络以Socket
的方式进行通讯的。
我们知道,Kafka
对于一个分区下的消息,是顺序处理的,那么这个特性会引来几个问题:
而Kafka
是利用Reactor
模式来处理请求的。
Reactor
模式是事件驱动架构的一种实现方式,适合应用于处理多个客户端并发向服务器端发送请求的场景。大概的模式架构图如下:
解释如下:
SocketServer
组件中。这个组件有一个Acceptor
线程还有一个网络线程池。Acceptor
线程负责请求的分发,主要通过轮询的方式将入站请求均匀地发送到网络线程中。其中Kafka
提供相关参数控制网络线程池:
num.network.threads
:调整网络线程池的线程数,默认是3。即每台Broker
启动的时候都会创建3个网络线程,用于专门处理客户端发送的请求。Kafka
又对网络线程池的工作做进一步的划分,主要做异步线程池的处理:
Broker
端的一个IO
线程池负责将其从队列中取出来做处理。根据请求的类型做对应的操作。Purgatory
组件,用来缓存延时请求(一些暂时因不满足条件而立刻处理的请求),例如当ack
机制设置为all
的时候,当生产一条消息,该请求必须等待所有副本都接收到消息后才能够返回。那么等待的过程中,该IO
请求就会暂存到Purgatory
组件中。其中Kafka
提供相关参数控制IO
线程池:
num.io.threads
:调整IO线程池的线程数,默认是8。即每台Broker
启动的时候都会创建8个网络线程,用于处理请求的真实底层逻辑,例如消息写入磁盘,或者是从磁盘中读取消息。总的来说,Kafka
对于请求的处理用到了四大角色(正好也是请求的一个先后顺序):
Acceptor
线程:所有客户端的请求都会先经过此 ,由其通过轮询的方式,将请求均匀地分配到网络线程池中。IO
线程池(默认8个线程):处理控制类请求,从共享请求队列中取出请求,执行真正的业务逻辑。Purgatory
组件:用来缓存延时请求。备注(请求的分类):
PRODUCE
和 FETCH
这类请求。ISR
集合的 LeaderAndIsr
请求,负责勒令副本下线的 StopReplica
请求等。