• Kafka3.x核心速查手册三、服务端原理篇-3、Broker故障恢复机制


    4、Leader Partition自动平衡机制

    ​ 在一组Partiton中,Leader Partition通常是比较繁忙的节点,因为他要负责与客户端的数据交互,以及向Follower同步数据。默认情况下,Kafka会尽量将Leader Partition分配到不同的Broker节点上,用以保证整个集群的性能压力能够比较平均。

    ​ 但是,经过Leader Partition选举后,这种平衡就有可能会被打破,让Leader Partition过多的集中到同一个Broker上。这样,这个Broker的压力就会明显高于其他Broker,从而影响到集群的整体性能。

    ​ 为此,Kafka设计了Leader Partition自动平衡机制,当发现Leader分配不均衡时,自动进行Leader Partition调整。这个机制涉及到Broker中server.properties配置文件中的几个重要参数:

    #1 自平衡开关。默认true
    auto.leader.rebalance.enable
    Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.
    Type:	boolean
    Default:	true
    Valid Values:	
    Importance:	high
    Update Mode:	read-only
    #2 自平衡扫描间隔
    leader.imbalance.check.interval.seconds
    The frequency with which the partition rebalance check is triggered by the controller
    Type:	long
    Default:	300
    Valid Values:	[1,...]
    Importance:	high
    Update Mode:	read-only
    #3 自平衡触发比例
    leader.imbalance.per.broker.percentage
    The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.
    
    Type:	int
    Default:	10
    Valid Values:	
    Importance:	high
    Update Mode:	read-only
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这些信息截取自官网。

    这几个参数可以到broker的server.properties文件中修改。但是注意要修改集群中所有broker的文件,并且要重启Kafka服务才能生效。

    ​ Kafka在进行Leader Partition自平衡时的逻辑是这样的:他会认为AR当中的第一个节点就应该是Leader节点。这种选举结果成为preferred election 理想选举结果。Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition比例高于leader.imbalance.per.broker.percentage阈值时,就会触发一次Leader Partiton的自平衡。

    ​ 另外,你也可以通过手动调用kafka-leader-election.sh脚本,触发一次自平衡。例如:

    # secondTopic的partion2不是理想状态
    [oper@worker1 bin]$ ./kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic secondTopic
    Topic: secondTopic      TopicId: GluwugzmQV26zeqndtbGPA PartitionCount: 4       ReplicationFactor: 3    Configs: segment.bytes=1073741824
            Topic: secondTopic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2,0
            Topic: secondTopic      Partition: 1    Leader: 1       Replicas: 1,0,2 Isr: 1,2,0
            Topic: secondTopic      Partition: 2    Leader: 1       Replicas: 0,2,1 Isr: 1,2,0
            Topic: secondTopic      Partition: 3    Leader: 2       Replicas: 2,0,1 Isr: 1,2,0
    # 手动触发所有Topic的Leader Partitoin自平衡        
    [oper@worker1 bin]$ ./kafka-leader-election.sh --bootstrap-server worker1:9092 --election-type preferred --all-topic-partitions
    Successfully completed leader election (PREFERRED) for partitions disTopic-3, secondTopic-2
    # 自平衡后secondTopic的partition2就变成理想状态了。
    [oper@worker1 bin]$ ./kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic
    Topic: disTopic TopicId: vX4ohhIER6aDpDZgTy10tQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
            Topic: disTopic Partition: 0    Leader: 2       Replicas: 2,1   Isr: 1,2
            Topic: disTopic Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: disTopic Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
            Topic: disTopic Partition: 3    Leader: 2       Replicas: 2,0   Isr: 0,2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    ​ 但是要注意,这样Leader Partition自平衡的过程是一个非常重的操作,因为要涉及到大量消息的转移与同步。并且,在这个过程中,会有丢消息的可能。所以在很多对性能要求比较高的线上环境,会选择关闭Kafka的这个Leader Partiton自平衡操作,而用其他运维的方式手动进行Leader Partiton自平衡,尽量减少自平衡过程。。

    ​ 至于为什么会丢消息。下一章节就会给出答案。

    5、Partition故障恢复机制

    ​ 当一组Partition中选举出了一个Leader节点后,这个Leader节点就会优先写入并保存Producer传递过来的消息,然后再同步给其他Follower。当Leader Partition所在的Broker服务发生宕机时,Kafka就会触发Leader Partition的重新选举。但是,在选举过程中,原来Partition上的数据是如何处理的呢?

    ​ Kafka为了保证消息能够在多个Parititon中保持数据同步,内部记录了两个关键的数据:

    • LEO(Log End Offset): 每个Partition的最后一个Offset
    • HW(High Watermark): 一组Partiton中最小的LEO。

    在这里插入图片描述

    ​ 这两个参数的作用非常大。在所有服务都正常的情况下,当一个消息写入到Leader Partition后,并不会立即让消费者感知。而是会等待其他Follower Partition同步。当HW超过当前消息时,才会让消费者感知。比如在上图中,4号往后的消息,虽然写入了Leader Partition,但是消费者是消费不到的。

    ​ 当服务出现故障时,如果是Follower发生故障。Kafka会做如下处理:

    1. 将故障的Follower节点临时提出ISR集合。而其他Leader和Follower继续正常接收消息。
    2. 出现故障的Follower节点恢复后,不会立即加入ISR集合。该Follower节点会读取本地记录的上一次的HW,将自己的日志中高于HW的部分信息全部删除掉,然后从HW开始,向Leader进行消息同步。
    3. 等到该Follower的LEO大于等于整个Partiton的HW后,就重新加入到ISR集合中。这也就是说这个Follower的消息进度追上了Leader。

    在这里插入图片描述

    ​ 如果是Leader节点出现故障,Kafka为了保证消息的一致性,处理就会相对复杂一点。

    1. Leader发生故障,会从ISR中进行选举,将一个原本是Follower的Partition提升为新的Leader。这时,消息有可能没有完成同步,所以新的Leader的LEO会低于之前Leader的LEO。
    2. Kafka中的消息都只能以Leader中的备份为准。其他Follower会将各自的Log文件中高于HW的部分全部清理掉,然后从新的Leader中同步数据。
    3. 旧的Leader恢复后,将作为Follower节点,进行数据恢复。

    在这里插入图片描述

    ​ 在这个过程当中,Kafka注重的是保护多个副本之间的数据一致性。但是这样,消息的安全性就得不到保障。例如在上述示例中,原本Partition0中的4,5,6,7号消息就被丢失掉了。

    6、HW一致性保障-Epoch更新机制

    ​ 有了HW机制后,各个Partiton的数据都能够比较好的保持统一。但是,这个HW值是否安全呢?

    ​ Leader Partition需要支持其他Follower Partition拉取最新的消息副本,就需要在Broker上保留所有Follower Partition的LEO值。

    ​ 但是,对于Follower Partition,他需要先将消息从Leader Partition拉取到本地,才能向Leader Partition上报LEO值。所有Follower Partition上报后,Leader Partition才能更新HW的值,然后Follower Partition在下次拉取消息时,才能更新HW值。所以,Leader Partiton的LEO更新和Follower Partition的LEO更新,在时间上是有延迟的。这也导致了Leader Partition上更新HW值的时刻与Follower Partition上跟新HW值的时刻,是会出现延迟的。

    ​ 这样,如果有多个Follower Partition,这些Partition保存的HW的值是不统一的。当Leader Partition出现切换,所有的Follower Partition都按照自己的HW进行数据恢复,就会出现数据不一致的情况。

    ​ 因此,Kafka还设计了Epoch机制,来保证HW的一致性。

    1. Epoch是一个单调递增的版本号,每当Leader Partition发生变更时,该版本号就会更新。所以,当有多个Epoch时,只有最新的Epoch才是有效的,而其他Epoch对应的Leader Partition就是过期的,无用的Leader。
    2. 每个Leader Partition在上任之初,都会新增一个新的Epoch记录。这个记录包含更新后端的epoch版本号,以及当前Leader Partition写入的第一个消息的偏移量。例如(1,100)。表示epoch版本号是1,当前Leader Partition写入的第一条消息是100. Broker会将这个epoch数据保存到内存中,并且会持久化到本地一个leader-epoch-checkpoint文件当中。
    3. 这个leader-epoch-checkpoint会在所有Follower Partition中同步。当Leader Partition有变更时,新的Leader Partition就会读取这个Epoch记录,更新后添加自己的Epoch记录。
    4. 接下来其他Follower Partition要更新数据时,就可以不再依靠自己记录的HW值判断拉取消息的起点。而可以根据这个最新的epoch条目来判断。

    在这里插入图片描述

    ​ 这一部分整个过程都是从Zookeeper的数据着手一路梳理下来,逐个问题逐步深入,这样才能将这些零散复杂的机制串联起来。

  • 相关阅读:
    LeetCode 454 四数相加II 383. 赎金信 15. 三数之和 18. 四数之和
    算法进修Day-38
    MTK APP实现动态修改logo和开机动画
    QPainter、QPen 、QBrush(概念)
    uni-app 折叠自定义
    【node进阶】在node.js中优雅的使用Socket.IO模块
    达梦数据库的名词解释
    乘积数量(冬季每日一题 14)
    二叉树算法
    C++switch语句
  • 原文地址:https://blog.csdn.net/roykingw/article/details/126896504