kafka中的broker要选举Controller角色来管理整个kafka集群中的分区和副本状态。一个Topic下多个partition要选举Leader角色和客户端进行交互数据
Zookeeper客户端工具: prettyZoo。 下载地址:https://github.com/vran-dev/PrettyZoo/releases
对于kafka还有个问题,就是kafka集群的每个broker都会注册在zookeeper的临时节点/broker/ids/{BrokerId} ,如果集群节点服务器是非正常关机,zookeeper上面对应broker的节点不会删除,再次启动broker往zookeeper会报错
通过在zookeeper上创建一个/controller的临时节点,写入当前启动的broker信息,其它的服务器无法写入了,写入成功的作为Controller,写入的内容如下:
{"version":1,"brokerid":0,"timestamp":"1661492503848"}
Controller会与zookeeper保持一个长连接,如果属于Controller角色的broker宕机,zookeeper长时间检测不到心跳就会删除/controller节点,其它broker就会监听到并重新竞争/controller
选举产生的Controller节点,就会负责监听Zookeeper中的其他一些关键节点,触发集群的相关管理工作。例如:
另外,Controller还需要负责将元数据推送给其他Broker。
其中,AR和ISR比较关键,可以通过kafka-topics.sh的--describe指令查看。
- [oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic disTopic
- Topic: disTopic TopicId: vX4ohhIER6aDpDZgTy10tQ PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824
- Topic: disTopic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
- Topic: disTopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
- Topic: disTopic Partition: 2 Leader: 2 Replicas: 0,2 Isr: 2,0
- Topic: disTopic Partition: 3 Leader: 2 Replicas: 2,0 Isr: 2,0
这个结果中,AR就是Replicas列中的Broker集合。而这个指令中的所有信息,其实都是被记录在Zookeeper中的。
接下来,Kafka设计了一套非常简单高效的Leader Partition选举机制。在选举Leader Partition时,会按照AR中的排名顺序,靠前的优先选举。只要当前Partition在ISR列表中,也就是是存活的,那么这个节点就会被选举成为Leader Partition。
例如,我们可以设计一个实验来验证一下LeaderPartiton的选举过程。
- #1、创建一个备份因子为3的Topic,每个Partition有3个备份。
- [oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 3 --partitions 4 --topic secondTopic
- Created topic secondTopic.
- #2、查看Topic的Partition情况 可以注意到,默认的Leader就是ISR的第一个节点。
- [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic
- Topic: secondTopic TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4 ReplicationFactor: 3 Configs:
- Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
- Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
- Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
- Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
- #3、在worker3上停掉brokerid=2的kafka服务。
- [oper@worker3 kafka_2.13-3.2.0]$ bin/kafka-server-stop.sh
- #4、再次查看SecondTopic上的Partiton分区情况
- [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic
- Topic: secondTopic TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4 ReplicationFactor: 3 Configs:
- Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0
- Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1
- Topic: secondTopic Partition: 2 Leader: 1 Replicas: 2,1,0 Isr: 1,0
- Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0
从实验中可以看到,当BrokerId=2的kafka服务停止后,2号BrokerId就从所有Partiton的ISR列表中剔除了。然后,Partition2的Leader节点原本是Broker2,当Broker2的Kafka服务停止后,都重新进行了Leader选举。Parition2预先评估的是Replicas列表中Broker2后面的Broker1,Broker1在ISR列表中,所以他被最终选举成为Leader。
当Partiton选举完成后,Zookeeper中的信息也被及时更新了。
- /brokers/topics/secondTopic: {"partitions":{"0":[1,0,2],"1":[0,2,1],"2":[2,1,0],"3":[1,2,0]},"topic_id":"W3mXDtj1RsWmsEhQrZjN5g","adding_replicas":{},"removing_replicas":{},"version":3}
- /brokers/topics/secondTopic/partitions/0/state: {"controller_epoch":20,"leader":1,"version":1,"leader_epoch":2,"isr":[1,0]}
Leader Partitoin选举机制能够保证每一个Partition同一时刻有且仅有一个Leader Partition,这样分配Leader Partition是有问题的,Leader Partition用于接收客户端请求,这样分配显然是分配不均,导致Broker1过于繁忙
Kafka会尽量将Leader Partition分配到不同的Broker节点上
Kafka在进行Leader Partition自平衡时的逻辑是这样的:他会认为AR当中的第一个节点就应该是Leader节点。这种选举结果成为preferred election 理想选举结果。Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition比例高于leader.imbalance.per.broker.percentage阈值时,就会触发一次Leader Partiton的自平衡。
这是官方文档的部分截图。
这个机制涉及到Broker中server.properties配置文件中的几个重要参数:
- #1 自平衡开关。默认true
- auto.leader.rebalance.enable
- Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.
- Type: boolean
- Default: true
- Valid Values:
- Importance: high
- Update Mode: read-only
- #2 自平衡扫描间隔
- leader.imbalance.check.interval.seconds
- The frequency with which the partition rebalance check is triggered by the controller
- Type: long
- Default: 300
- Valid Values: [1,...]
- Importance: high
- Update Mode: read-only
- #3 自平衡触发比例
- leader.imbalance.per.broker.percentage
- The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.
-
- Type: int
- Default: 10
- Valid Values:
- Importance: high
- Update Mode: read-only
这几个参数可以到broker的server.properties文件中修改。但是注意要修改集群中所有broker的文件,并且要重启Kafka服务才能生效。
另外,你也可以通过手动调用kafka-leader-election.sh脚本,触发一次自平衡。例如:
- # 启动worker3上的Kafka服务,Broker2上线。
- # secondTopic的partion2不是理想状态。理想的leader应该是2
- [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic
- Topic: secondTopic TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4 ReplicationFactor: 3 Configs:
- Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
- Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
- Topic: secondTopic Partition: 2 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2
- Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
- # 手动触发所有Topic的Leader Partition自平衡
- [oper@worker1 bin]$ ./kafka-leader-election.sh --bootstrap-server worker1:9092 --election-type preferred --topic secondTopic --partition 2
- Successfully completed leader election (PREFERRED) for partitions secondTopic-2
- # 自平衡后secondTopic的partition2就变成理想状态了。
- [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic
- Topic: secondTopic TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4 ReplicationFactor: 3 Configs:
- Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
- Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
- Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
- Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
但是要注意,这样Leader Partition自平衡的过程是一个非常重的操作,因为要涉及到大量消息的转移与同步。并且,在这个过程中,会有丢消息的可能。所以在很多对性能要求比较高的线上环境,会选择将参数auto.leader.rebalance.enable设置为false,关闭Kafka的Leader Partition自平衡操作,而用其他运维的方式,在业务不繁忙的时间段,手动进行Leader Partiton自平衡,尽量减少自平衡过程对业务的影响。
当Leader Partition对应broker宕机了如何选举新的Leader Partition接收客户端请求
先理解如下两个参数
这个参数比较好理解,每个Partition都会记录自己保存的消息偏移量。leader partition收到并记录了生产者发送的一条消息,就将LEO加1。而接下来,follower partition需要从leader partition同步消息,每同步到一个消息,自己的LEO就加1。通过LEO值,就知道各个follower partition与leader partition之间的消息差距。
follower partition每次往leader partition同步消息时,都会同步自己的LEO给leader partition。这样leader partition就可以计算出这个HW值,并最终会同步给各个follower partition。leader partition认为这个HW值以前的消息,都是在所有follower partition之间完成了同步的,是安全的。这些安全的消息就可以被消费者拉取过去了。而HW值之后的消息,就是不安全的,是可能丢失的。这些消息如果被消费者拉取过去消费了,就有可能造成数据不一致。
也就是说,在所有服务都正常的情况下,当一个消息写入到Leader Partition后,并不会立即让消费者感知。而是会等待其他Follower Partition同步。这个过程中就会推进HW。当HW超过当前消息时,才会让消费者感知。比如在上图中,4号往后的消息,虽然写入了Leader Partition,但是消费者是消费不到的。
这跟生产者的acks应答参数是不一样的
当服务出现故障时,如果是Follower发生故障,这不会影响消息写入,只不过是少了一个备份而已。处理相对简单一点。Kafka会做如下处理:
如果是Leader节点出现故障,Kafka为了保证消息的一致性,处理就会相对复杂一点。
在这个过程当中,Kafka注重的是保护多个副本之间的数据一致性。但是这样,消息的安全性就得不到保障。例如在上述示例中,原本Partition0中的4,5,6,7号消息就被丢失掉了。
有了HW机制后,各个Partiton的数据都能够比较好的保持统一。但是,实际上,HW值在一组Partition里并不是总是一致的。
Leader Partition需要计算出HW值,就需要保留所有Follower Partition的LEO值。
但是,对于Follower Partition,他需要先将消息从Leader Partition拉取到本地,才能向Leader Partition上报LEO值。所有Follower Partition上报后,Leader Partition才能更新HW的值,然后Follower Partition在下次拉取消息时,才能更新HW值。所以,Leader Partiton的LEO更新和Follower Partition的LEO更新,在时间上是有延迟的。这也导致了Leader Partition上更新HW值的时刻与Follower Partition上跟新HW值的时刻,是会出现延迟的。这样,如果有多个Follower Partition,这些Partition保存的HW的值是不统一的。当然,如果服务一切正常,最终Leader Partition还是会正常推进HW,能够保证HW的最终一致性。但是,当Leader Partition出现切换,所有的Follower Partition都按照自己的HW进行数据恢复,就会出现数据不一致的情况。
因此,Kafka还设计了Epoch机制,来保证HW的一致性。
这个关键的leader-epoch-checkpoint文件保存在Broker上每个partition对应的本地目录中。这是一个文本文件,可以直接查看。他的内容大概是这样样子的:
- [oper@worker1 disTopic-0]$ cat leader-epoch-checkpoint
- 0
- 1
- 29 2485991681
其中
第一行版本号
第二行表示下面的记录数。这两行数据没有太多的实际意义。
从第三行开始,可以看到两个数字。这两个数字就是epoch 和 offset。epoch就是表示leader的epoch版本。从0开始,当leader变更一次epoch就会+1。offset则对应该epoch版本的leader写入第一条消息的offset。可以理解为用户可以消费到的最早的消息offset。