• Zookeeper中leader选举原理分析



    Leader选举中涉及的概念

    服务器角色

    • leader 处理读写请求
    • follower 参与leader选举,只能处理读请求并可将事务请求转交给leader来处理
    • observer 不参与leader选举,只处理读请求

    服务器状态

    • LOOKING 服务器正处在选举状态 说明集群还在投票选举 还没有选出leader
    • LEADING 服务器处于leading状态,当前server角色是leader
    • FOLLOWING 服务器处于following状态,表示当前server的角色是follower
    • OBSERVING 服务器处于observing状态,当前server角色是observer

    几个属性

    myid

    这个代表服务器ID,其值例如1、2、3等,在$ZK_HOME/data/myid文件中配置。myid值越大 在leader选举过程中的权重越大

    zxid

    最近一次处理成功的事务ID,zxid越大 说明数据越新,在leader选举过程中的权重越大
    例如下面例子中的 mZxid

    [zk: localhost:2181(CONNECTED) 3] create /wojiushiwo 123
    Created /wojiushiwo
    [zk: localhost:2181(CONNECTED) 7] stat /wojiushiwo
    cZxid = 0x1400000002
    ctime = Wed Aug 03 10:55:58 CST 2022
    mZxid = 0x1400000002
    mtime = Wed Aug 03 10:55:58 CST 2022
    pZxid = 0x1400000002
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 8] set /wojiushiwo 1234
    cZxid = 0x1400000002
    ctime = Wed Aug 03 10:55:58 CST 2022
    mZxid = 0x1400000003
    mtime = Wed Aug 03 10:56:59 CST 2022
    pZxid = 0x1400000002
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    epoch

    代表投票轮数,从0开始,每投完一次票 投票轮数+1。一般而言 同一轮投票的epoch是相同的,当然也有不同的情况,那就是有节点宕机后重新参与leader选举。

    源码分析

    以zookeeper3.6.1版本为例 分析下Leader选举过程

    zookeeper3.6.1 Leader选举算法的实现类为FastLeaderElection

    在这里插入图片描述

    对流程图一些调用进行分析

    1、QuorumPeer是个线程实现类,其run方法中有leader选举的操作

    2、WorkerSender#run 将当前节点票据从sendqueue队列取出 广播给集群中的节点

    3、WorkerReceiver#run 收到集群中其他节点的票据 将其放到recvqueue队列 用于PK投票

    FastLeaderElection
        // 与集群中其他节点进行通信
    		QuorumCnxManager manager;
        //发送票据的队列
        LinkedBlockingQueue<ToSend> sendqueue;
    		//接受票据的队列
        LinkedBlockingQueue<Notification> recvqueue;
    		//当前节点
        QuorumPeer self;
        Messenger messenger;
    		//逻辑时钟 代表选举轮数 
        AtomicLong logicalclock = new AtomicLong(); /* Election instance */
    		//数据id 对应myid
        long proposedLeader;
    		//事务id
        long proposedZxid;
    		//选举轮数
        long proposedEpoch;
        volatile boolean stop;
        private SyncedLearnerTracker leadingVoteSet;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    public Vote lookForLeader() throws InterruptedException {
           //省略无关代码
    
            self.start_fle = Time.currentElapsedTime();
            try {
                /*
                 * 存储接收到的投票 key是节点id,即myid
                 */
                Map<Long, Vote> recvset = new HashMap<Long, Vote>();
               
                int notTimeout = minNotificationInterval;
    
                synchronized (this) {
                    //逻辑时钟+1 表示开启新一轮投票
                    logicalclock.incrementAndGet();
                    //更新当前节点的票据
                    //getInitId 从myid中取值
                    //getInitLastLoggedZxid 获得上一次成功执行的事务id
                    // getPeerEpoch 从currentEpoch文件中取值
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                }
    
               //省略日志
               //发送票据 这里是自己给自己发 先投自己一票
                sendNotifications();
    
                SyncedLearnerTracker voteSet;
    
                /*
                 * Loop in which we exchange notifications until we find a leader
                 */
    						//当集群刚启动时,集群节点状态是LOOKING 正忙着leader选举
                while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
    
                    //前面说过 recvqueue队列存储其他节点广播过来的票据
                    //这里 取出其他节点的票据
                    Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                   
                    /*
                     * Sends more notifications if haven't received enough.
                     * Otherwise processes new notification.
                     */
                    //当没有取到票据时 
                    if (n == null) {
                      	//如果queueSendMap集合为空 说明票据都发送出去了
                        if (manager.haveDelivered()) {
                          //发送一次票据
                          //笔者不太理解这里,为什么要再发一次票据 而且这票据还是当前节点的票据,前面不是发过了吗?
                            sendNotifications();
                        } else {
                          //如果queueSendMap集合中还有数据 说明还有票据没有发送出去 可能集群连接断开了 需要重新连接
                            manager.connectAll();
                        }
    
                        /*
                         * 由于没有从recvqueue队列取到票据 这里适当延长超时时间,再次尝试
                         */
                        int tmpTimeOut = notTimeout * 2;
                        notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                        LOG.info("Notification time out: {}", notTimeout);
                    } else if (validVoter(n.sid) && validVoter(n.leader)) {
                      //当取到了票据后 ,这里else if中的逻辑判断是:当前节点的myid是否属于集群节点
                        
                        switch (n.state) {
                            //集群启动时 只有节点状态是LOOKING的 才有资格参与选举
                            case LOOKING:
                                if (getInitLastLoggedZxid() == -1) {
                                    LOG.debug("Ignoring notification as our zxid is -1");
                                    break;
                                }
                                if (n.zxid == -1) {
                                    LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                                    break;
                                }
                                //当接收到的选票 轮次高于当前节点选票轮次 说明当前节点"落后了"
                                if (n.electionEpoch > logicalclock.get()) {
                                    System.out.println(">");
                                    //重新设置当前节点选票轮次
                                    logicalclock.set(n.electionEpoch);
                                    //并清空接受到的票据集合(改朝换代了 从头开始)
                                    recvset.clear();
                                    //比较当前票据与其他节点的票据 比出高低后 重新为当前票据赋值
                                    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    } else {
                                        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                    }
                                    //然后重新发送最新票据
                                    sendNotifications();
                                } else if (n.electionEpoch < logicalclock.get()) {
                                    //当接收到的选票 轮次低于当前节点选票轮次 说明别的那个节点"落后了",选票直接丢弃不管了
                                    //省略日志
                                    break;
                                } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    //当前节点与别的节点在同一个选举轮次 大多数情况下 会走到这个循环里
                                    //比出高低后 重新为当前票据赋值
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    //然后重新发送最新票据
                                    sendNotifications();
                                }
    
                               //省略日志
                             
                                //记录接收到的票据
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                                //构造 SyncedLearnerTracker 查看收到的票据中 与当前机器票据一样的票据 并存储在SyncedLearnerTracker里
                                //统计选票
                                voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                                //判断投票结果 如果当前节点获取了一半以上的投票
                                if (voteSet.hasAllQuorums()) {
    
                                    //如果在finalizeWait时间后再没有投票出现 则认为本轮选举结束 会设置LEADER
                                    //如果依然有投票出现 即下面n不为null,则再次执行上面的大while循环 再次比较票据、PK等
                                    while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                            recvqueue.put(n);
                                            break;
                                        }
                                    }
    
    
                                    if (n == null) {
                                        //统计的选票中 当前节点票据超过总节点数量票据的一半 则当前节点是LEADER 状态是LEADING
                                        setPeerState(proposedLeader, voteSet);
                                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                      	 //清空接受到的票据集合
                                        leaveInstance(endVote);
                                        return endVote;
                                    }
                                }
                                break;
                            //省略无关代码
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    票据比较
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
            //省略无关代码
          
            return ((newEpoch > curEpoch)
                    || ((newEpoch == curEpoch)
                    && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                    && (newId > curId)))));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    票据PK的比较流程:

    1、选举轮数较大的直接获胜

    2、选举轮数相同的,事务id较大的 直接获胜

    3、选举轮数、事务id都相同的,myid值较大的 获胜

    投票结果 确认

    zk 有两种leader选举衡量方式:基于权重和基于投票数,默认基于投票数

    public QuorumMaj(Map<Long, QuorumServer> allMembers) {
        this.allMembers = allMembers;
        for (QuorumServer qs : allMembers.values()) {
            //参与投票的节点
            if (qs.type == LearnerType.PARTICIPANT) {
                votingMembers.put(Long.valueOf(qs.id), qs);
            } else {
                observingMembers.put(Long.valueOf(qs.id), qs);
            }
        }
        //half=参与投票节点数的一半
        half = votingMembers.size() / 2;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    public boolean containsQuorum(Set<Long> ackSet) {
            //ackSet 是 投给当前节点的节点数 
            return (ackSet.size() > half);
        }
    
    • 1
    • 2
    • 3
    • 4

    针对前面的代码分析基础,这里简述下3个节点的zk集群启动时选主流程

    zk节点1 myid=1、zk节点2 myid=2、zk节点3 myid=3

    集群启动时 假设其投票轮数是一致的,并且由于数据同步的存在 其zxid也是一致的

    1、节点1启动后 先给自己投一票

    2、节点2启动后 先给自己投一票,并且与节点1 交换票据。

    • 节点1 收到节点2的票据后 由于自己的myid较小 pk失败 更新自己的票据(投节点2一票)广播票据
    • 节点2 收到节点1的票据后 由于自己的myid较大 完胜 广播票据。
    • 此时节点2 获得两票 > (3/2=1) 因此节点2 晋升为leader,节点1成为follower

    3、节点3启动后 先给自己投一票,并且与节点1、节点2交换票据。虽然节点3的myid比较大,但是节点2已经是leader了,因此节点3也就变成follower了。

  • 相关阅读:
    MybatisPlus 常用注解
    haproxy keepalive实践
    python+深度学习+opencv实现植物识别算法系统 计算机竞赛
    jenkins 原理篇——pipeline流水线 声明式语法详解
    客户端版socket,完成消息广播,详解Websocket即时通信,点名系统
    Spring注解开发
    excle中数据分析,excle导入用sql简单处理
    【PMO项目管理】深入了解项目管理 | Stakeholder 利益相关者 | 利益相关者之间的立场差异
    OSG文字-显示汉字 (1)
    springboot Logback 不同环境,配置不同的日志输出路径
  • 原文地址:https://blog.csdn.net/zyxwvuuvwxyz/article/details/126139495