• 【ZooKeeper】zookeeper源码6-FastLeaderElection选举算法


    源码项目zookeeper-3.6.3:核心工作流程


    选举算法背景知识:
    1、所有的节点(有选举权和被选举权),一上线,就是LOOKING状态,当选举结束了之后,有选举权中的服务器会变成另外两种:Leader、Follower
    2、在选举过程中,两两服务器之间,都会交换选票(更严格的说:在每一轮选举中,每个服务器节点,都会把自己的选票广播给其他的所有服务器),选票是vote对象,广播到所有节点

    • 第一轮:每个节点都选举自己当leader,然后广播这个选票给其他所有服务器,第一轮必然选举不出来
    • 第二轮:因为当第一轮结束之后,每个服务器都会接收到其他服务器的反馈,基于接收到的选票和推举自己当leader的选票做对比就能得出一个更优的选票,当前节点,就更换选票了(epoch/logicalLock zxid myid)
      事实上,关于选票和投票的类型有四种:
      • 1、Vote 选票,任何一台服务器,最开始初始化的推举自己当leader的信息,也是保存最后选举出来的leader的信息
      • 2、Notification 接收到的投票
      • 3、Message 放入投票队列的时候会变成
      • 4、ToSend 待发送的选票
      • 还有一个特殊的中间对象:ByteBuffer NIO的一个API
        3、当每个zookeeper服务器启动好了之后,第一件事就是发起投票,如果是第一次发起投票都是去找leader,如果发现其他zookeeper返回leader的信息,选举直接结束
        4、在进行选票发送的时候,每个zookeeper都会为其他的zookeeper服务节点生成一个对应的SendWorker、ReceiveWorker和一个ArrayBlockingQueue
        ArrayBlockingQueue 存放待发送的选票
        SendWorker 从队列中,获取选票执行发送
        ReceiveWorker 从其他节点接收发送过来的投票

    执行选举调用过程

    //寻找执行选举
    方法的调用路径过程
    -QuorumPeerMain.java
    	-main//此处传入arg[0]=zoo.cfg
    		-main.initializeAndRun(args);//此处main是main方法内QuorumPeerMain main = new QuorumPeerMain()构建的实例
    	-initializeAndRun//1.解析zoo.cfg和myid配置 2.启动一个关于旧快照数据文件的定期删除任务 3.启动QuorumPeer
    		-runFromConfig(config);//QuorumPeerConfig config = new QuorumPeerConfig()解析zoo.cfg得到的配置信息,都被放置再了QuorumPeerConfig中
    	-runFromConfig//1.启动真正意义的Server,监听2181端口的NIO服务器 
    				  //2.获取QuorumPeer实例对象,把QuorumPeerConfig中成员变量值复制到QuorumPeer里 
    				  //3.启动QuorumPeer
        	-quorumPeer.start();
        	//quorumPeer = getQuorumPeer();//构造了一个QP实例对象,从QuorumPeerConfig对象中取出配置存到quorumPeer中(其实从开始的zoo.cfg经历了多个存储对象)
        	//进入QuorumPeer.java
    -QuorumPeer.java
    	-private volatile Vote currentVote;//成员变量,票,存储谁当Leader信息
    	-start()
    		//1.loadDataBase();//冷启动数据恢复
    		//2.startServerCnxnFactory();//启动NIO服务端,未发送读写请求客户端提供服务
    		//3.adminServer.start();//启动AdminServer 就是web界面那个
    		//4.startLeaderElection();//准备选举环境
    		//5.startJvmPauseMonitor();//启动一个JVM监视器gc
    		//6.super.start();//进入ZAB工作状态:首先执行选举
    		-startLeaderElection()
    		//1.创建QuorumCxnManager	选举网络环境,选票送达和接收的动作
    		//2.创建FastLeaderElection	选举的逻辑抽象
    			-currentVote = new Vote(myid,getLastLoggedZxid(),getCurrentEpoch());
    			//先准备票,三个条件myid、zxid、epoch,均为当前server的
    			//由此代码实现,每个节点的第一轮选举,必然是推举自己
    			-this.electionAlg = createElectionAlgorithm(electionType);
    			//创建选举算法实例
    			//electionType = 默认3 = 代表实例是FastLeaderElection
    				-switch(electionAlgorithm)
    				//1和2不支持了,只有3,没有0
    				//在3.4.14中0是LeaderElection,1是AuthFastLeaderElection,2是AuthFastLeaderElection
    				//在3中:
    				//1.创建QuorumCxnManager
    				//2.创建FastLeaderElection
    					-QuorumCnxManager qcm = createCnxnManager();
    						-return new QuorumCnxManager//是一个构造类
    							-QuorumCnxManager
    							//进入QuorumCnxManager.java
    					-QuorumCnxManager.Listener listener = qcm.listener;
    					//.Listener为内部类,在qcm创建的时候listener已经创建好,这里赋值就是引用传递
    					//listener是负责管理选举过程中的网络服务端,启动一个BIO服务端绑定3888端口
    					//ZooKeeper中由三套网络通信架构,一套选举,一套网络同步,一套为客户端提供服务
    					-listener.start();//启动
    					-FastLeaderElection fle = new FastLeaderElection(this,qcm);
    						-FastLeaderElection(QuorumPeer self,QuorumCnxManager manager)
    						//进入FastLeaderElection.java
    					-fle.start();//对上面的线程进行启动
    					
    -QuorumCnxManager.java//选举的网络环境,选票发送和接收的动作
    	-QuorumCnxManager
    		-1.this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
    		//队列,存放其他服务器发送过来的选票
    		-2.this.queueSendMap = new ConcurrentHashMap<>();
    		//key=myid	value=SendQueue
    		-3.this.senderWorkerMap = new ConcurrentHashMap<>();
    		//key=myid value=SendWorker线程
    		-4.this.lastMessageSent = new ConcurrentHashMap<>();
    		//key=myid value=Vote最近发送的一次票,包括你给他,他给你,他给他等
    		-5.initializeConnectionExecutor(mySid,quorumCnxnThreadsSize);
    		//初始化一个线程池,初始线程数量是3,最大线程数量,默认是20
    		//旧版本:对方有多少个服务器,就有多少个线程
    		-6.listener = new Listener();
    		//创建一个BIO服务端ServerSocket,绑定3888选举端口
    			-Listener()//是一个线程
    			-run()
    				-listenerHandlers = addresses.stream().map(address->new ListenerHandler(address,self.shouldUsePortUnification(),self.isSslQuorum(),latch)).collect(Collectors.toList());
    				//每个地址生成一个ListenerHandler线程,会被提交到exector线程池中
    				//正常选举端口就一个,只要启动一个服务端就可以了,即通过一个线程做
    					-run() //核心逻辑必然在线程的run()
    						-acceptConnections();
    						//接收连接,另外申请服务端阻塞在accept方法中
    							-serverSocket = createNewServerSocket();//创建Server服务端
    							-while(!shutdown)//不停的接收连接
    								-client = serverSocket.accept();//对申请的连接,返回
    				-ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
    				-listenerHandlers.forEach(executor::submit);
    									
    -FastLeaderElection.java//选举的逻辑抽象
    	-FastLeaderElection(QuorumPeer self,QuorumCnxManager manager)//两个成员变量
    		-starter(self,manager);
    		//四个组件两个队列两个线程
    			-sendqueue = new LinkedBlockingQueue<ToSend>();
    			-recvqueue = new LinkedBlockingQueue<Notification>();
    			-this.messenger = new Messenger(manager);
    			//内部两个线程
    				-this.ws = new WorkerSender(manager);
    				-this.wr = new WorkerReceiver(manager);
    				//消费ArrayBlockingQueue recvQueue队列,
    				//如果是合法的票,拿到Message,放入recvqueue,变成Notification
    				//如果不是合法的票,需要重发,会把票重新构建为ToSend,放到sendqueue
    				//由WorkSender去消费这个sendqueue队列
    				//如果WorkerSender把票发给自己,会把ToSend对象变成Message放回ArrayBlockingQueue
    				//如果WorkerSender把票发给别人,会把ToSend对象变成ByteBuffer放到SendQueue队列
    				//因为SendWorker线程在工作,会去消费SendQueue的ByteBuffer,把票发送出去
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    /*************************************************
    *  注释: 准备选举算法实例及相关环境
    *  1、准备选举环境
    *      启动了一个 BIO 服务端
    *  2、创建选举算法实例
    *      创建 FastLeaderElection 实例对象,极其复杂!
    *  3、执行选举:FastLeaderElection.lookForLeader();
    */
    startLeaderElection();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    	/*************************************************
         *  注释: 核心动作:
         *  1、创建 QuorumCxnManager   选举的网络环境,选票发送和接收的动作
         *  2、创建 FastLeaderElection     选举的逻辑抽象
         */
        public synchronized void startLeaderElection() {
    
            /*************************************************
             *  注释: 初始化推举选票
             */
            try {
                if(getPeerState() == ServerState.LOOKING) {
                    // TODO 注释: 这些信息,都是当前这个 server 的
                    // TODO 注释: 三个条件:myid,zxid,epoch
                    // TODO 注释: 由这个代码实现可以看出,每个节点,的第一轮选举,必然都是推举自己
                    currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
                }
                // TODO 注释: 当开始选举的时候,这个票的信息,时候自己的信息
                // TODO 注释: 当选举结束的是偶,这个票的信息,就是被推举成为leader的节点信息
            } catch(IOException e) {
                RuntimeException re = new RuntimeException(e.getMessage());
                re.setStackTrace(e.getStackTrace());
                throw re;
            }
    
            /*************************************************
             *  注释: 创建选举算法实例
             *  electionType = 默认3 = FastLeaderElection
             *  electionType =
             */
            this.electionAlg = createElectionAlgorithm(electionType);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    /*************************************************
         *  注释: 核心做两件事
         *  1、初始化 QuorumCnxManager
         *  2、初始化 FastLeaderElection
         */
        @SuppressWarnings("deprecation")
        protected Election createElectionAlgorithm(int electionAlgorithm) {
            Election le = null;
    
            //TODO: use a factory rather than a switch
            switch(electionAlgorithm) {
                case 1:
                    throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
                case 2:
                    throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
                case 3:
    
                    /*************************************************
                     *  注释: 事实上是创建 QuorumCnxManager.Listener 这个对象
                     */
                    QuorumCnxManager qcm = createCnxnManager();
                    QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
                    if(oldQcm != null) {
                        LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                        oldQcm.halt();
                    }
                    
                    // TODO 注释: 负责管理选举过程中的,网络服务端
                    // TODO 注释: 启动一个 BIO 服务端,绑定 3888 端口
                    QuorumCnxManager.Listener listener = qcm.listener;
                    if(listener != null) {
    
                        /*************************************************
                         *  注释: 其实就是启动了一个 BIO 的服务端,绑定了选举端口
                         *  等待客户端发起链接请求过来
                         */
                        listener.start();
    
                        /*************************************************
                         *  注释: 初始化了两个线程,两个队列
                         */
                        FastLeaderElection fle = new FastLeaderElection(this, qcm);
    
                        /*************************************************
                         *  注释: 启动这两个线程
                         */
                        fle.start();
    
                        le = fle;
                    } else {
                        LOG.error("Null listener when initializing cnx manager");
                    }
                    break;
                default:
                    assert false;
            }
            return le;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    /*************************************************
         *  注释:
         *  1、his.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
         *      队列,存放其他服务器发送过来的选票
         *  2、this.queueSendMap = new ConcurrentHashMap<>();
         *      key = myid
         *      value = SendQueue
         *  3、this.senderWorkerMap = new ConcurrentHashMap<>();
         *      key = myid
         *      value = SendWorker
         *  4、this.lastMessageSent = new ConcurrentHashMap<>();
         *      kye = myid
         *      value = Vote
         *  5、initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);
         *      初始化一个线程池
         *      初始线程数量是3
         *      最大线程数量,默认是20
         *      老版本:对方有多少个服务器, 就有多少个线程
         *  6、listener = new Listener();
         *      创建一个 BIO 服务端 ServerSocket
         *      板顶 3888 这个选举端口
         */
        public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
                                QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
                                int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
            
            this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
            
            this.queueSendMap = new ConcurrentHashMap<>();
            
            this.senderWorkerMap = new ConcurrentHashMap<>();
    
            // TODO 注释: 记录最近一次发送 Message 的 Map
            this.lastMessageSent = new ConcurrentHashMap<>();
    
            String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
            if (cnxToValue != null) {
                this.cnxTO = Integer.parseInt(cnxToValue);
            }
            this.self = self;
            this.mySid = mySid;
            this.socketTimeout = socketTimeout;
            this.view = view;
            this.listenOnAllIPs = listenOnAllIPs;
            this.authServer = authServer;
            this.authLearner = authLearner;
            this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
    
            // TODO 注释: 启动 3 个线程用来完成选票交换的事儿,相当于 BIO 客户端
            initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);
            // TODO 注释: 原来的实现:对方节点有多少个,则有多少个客户端
            // TODO 注释: 此时做了优化:提供一个线程池来完成这个工作:初始化线程数量 = 3, 最大线程数量是 20
    
            /*************************************************
             *  注释: 启动监听,监听选举端口
             */
            // Starts listener thread that waits for connection requests
            listener = new Listener();
            listener.setName("QuorumPeerListener");
        }
    
        // we always use the Connection Executor during connection initiation (to handle connection
        // timeouts), and optionally use it during receiving connections (as the Quorum SASL authentication
        // can take extra time)
        private void initializeConnectionExecutor(final long mySid, final int quorumCnxnThreadsSize) {
            final AtomicInteger threadIndex = new AtomicInteger(1);
            SecurityManager s = System.getSecurityManager();
            final ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    
            final ThreadFactory daemonThFactory = runnable -> new Thread(group, runnable,
                    String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, threadIndex.getAndIncrement()));
    
            /*************************************************
             *  注释: 初始化 3 个线程,用来监听 选举链接请求
             */
            this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
                    new SynchronousQueue<>(), daemonThFactory);
            this.connectionExecutor.allowCoreThreadTimeOut(true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
  • 相关阅读:
    【Linux】firewall-cmd之防火墙简介及命令详解+实例
    Docker--容器挂载
    kafka 消费者的消费策略以及再平衡1
    MobileNetV2架构解析
    基于Unity3D开发的3D小游戏牧师与魔鬼
    Java --- SpringMVC的RESTFul风格
    C编译器01-扫描器
    spring面试整理
    python实现微信新版v3的jsapi支付
    学习node.js & WS&服务器设置SFTP
  • 原文地址:https://blog.csdn.net/qq_36679460/article/details/127717905