1.位置主题&写调整复习 (TODO)
2.重平衡是通过心跳线程的心跳请求来通知的,需要重平衡则请求响应REBALANCE_IN_PROGRESS。0.10.1.0版本后心跳线程和消息消费线程分离,避免了消息消费时间长影响心跳。heartbeat.interval.ms参数控制心跳请求频率也控制重平衡频率。
1.empty状态的消费者组,才会执行过期位置删除
2.协调者通过什么判断所有消费者都已经上报了,或者说怎么知道有多少消费者客户端。如果上报信息后,消费者客户端崩溃了,这能等待下次心跳才能知道吗?
评论区回答:如果在这次 Rebalance 期间,有 消费者 超时没有上报信息,那么这个消费者会被排除在这轮 Rebalance 之外。协调者在收到第一个joingroup请求后会等一段时间。
1.消费者组内每个成员都会定期汇报位移给协调者。重平衡开启,协调者会给消费者组成员一个缓冲时间上报自己的消费位移。如果没法上报唯一,就会重新消费一遍?
没提交位移就是没有消费,重平衡完成后这个分区会被可能分给其他消费者实例消费(个人理解),导致重复消费,在业务做去重更保险
2.消费者已经崩溃了,不会发送心跳,协调者这时候怎么做到能到session.timeout.ms感知并发起重平衡的?
每次consumer加入组就会发送session.timeout.ms这个参数,这样Coordinator收到心跳请求后会根据这个session timeout时间计算下次deadline时间,如果过了deadline还没有收到直接fail掉该consumer
3.再平衡前,要求在规定时间内提交位移,这个规定时间如何设置?默认值是什么?
rebalance timeout,默认是max.poll.intervals的值
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java#L53
4.使用consumer消费,发现consumer会出现commit offset failed,coordinator is not available 的错误,导致consumer卡住,无法消费。根据错误信息,是由于找不到groupcoordinator导致的,但coordinator不可用是在什么情况下发生的?
commit失败先看看是不是消息处理慢导致的吧。比如增加max.poll.interval.ms的值或降低max.poll.records的值试试看。Client端报出Coordinator不可用不一定表示Coordinator真的不可用
5.订阅主题发送变化之后触发reblance的流程是什么样子的?
consumer会定期更新元数据,如果发现了新增的订阅分区,会主动触发rebalance
6.消费者组创建的过程中,协调者何时判断所有消费者已加入,还是说它在收到第一个joingroup请求后等一段时间?
嗯,是的。你的理解是对的~
7.消费者是如何找到broker端的协调者的
coordinator的确定原则是寻找__consumer_offsets对应分区的leader副本所在的broker
8.Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果消费者组停掉了很长时间,那么 Kafka 很可能就把该组的位移数据删除了。不是 empty 状态,过期的就不删除吗?
是的
9.消费位移信息是存储在_consumer_offsets topic中,会有过期清理机制的。可是这里又说只在Empty状态才进行位移信息删除,有些不明白了。难道过期清理是compact,所以不叫删除?
不矛盾。首先Kafka的确会定期删除_consumer_offsets topic的数据,其次,Kafka只会为Empty状态的消费者组清理__consumer_offests中相关数据。
10.消费者主动离组
①消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。
② Consumer消费时间过长导致超过了max.poll.interval.ms时间,导致Consumer主动发起离开Group请求。
11.我记得之前在讲Consumer建立TCP连接那一节讲到:“如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接”。按照这个说法,是不是说消费端此时也会被协调者给fail掉,因为连接都不存在了,更不会发心跳给协调者了吧?此时也会触发新一轮的Reblance吧?
它们使用的不是一个TCP连接。Coordinator有专属的TCP连接