源码项目zookeeper-3.6.3:核心工作流程
选举算法背景知识:
1、所有的节点(有选举权和被选举权),一上线,就是LOOKING状态,当选举结束了之后,有选举权中的服务器会变成另外两种:Leader、Follower
2、在选举过程中,两两服务器之间,都会交换选票(更严格的说:在每一轮选举中,每个服务器节点,都会把自己的选票广播给其他的所有服务器),选票是vote对象,广播到所有节点
- 第一轮:每个节点都选举自己当leader,然后广播这个选票给其他所有服务器,第一轮必然选举不出来
- 第二轮:因为当第一轮结束之后,每个服务器都会接收到其他服务器的反馈,基于接收到的选票和推举自己当leader的选票做对比就能得出一个更优的选票,当前节点,就更换选票了(epoch/logicalLock zxid myid)
事实上,关于选票和投票的类型有四种:
- 1、Vote 选票,任何一台服务器,最开始初始化的推举自己当leader的信息,也是保存最后选举出来的leader的信息
- 2、Notification 接收到的投票
- 3、Message 放入投票队列的时候会变成
- 4、ToSend 待发送的选票
- 还有一个特殊的中间对象:ByteBuffer NIO的一个API
3、当每个zookeeper服务器启动好了之后,第一件事就是发起投票,如果是第一次发起投票都是去找leader,如果发现其他zookeeper返回leader的信息,选举直接结束
4、在进行选票发送的时候,每个zookeeper都会为其他的zookeeper服务节点生成一个对应的SendWorker、ReceiveWorker和一个ArrayBlockingQueue
ArrayBlockingQueue 存放待发送的选票
SendWorker 从队列中,获取选票执行发送
ReceiveWorker 从其他节点接收发送过来的投票
执行选举调用过程
方法的调用路径过程
-QuorumPeerMain.java
-main
-main.initializeAndRun(args);
-initializeAndRun
-runFromConfig(config);
-runFromConfig
-quorumPeer.start();
-QuorumPeer.java
-private volatile Vote currentVote;
-start()
-startLeaderElection()
-currentVote = new Vote(myid,getLastLoggedZxid(),getCurrentEpoch());
-this.electionAlg = createElectionAlgorithm(electionType);
-switch(electionAlgorithm)
-QuorumCnxManager qcm = createCnxnManager();
-return new QuorumCnxManager
-QuorumCnxManager
-QuorumCnxManager.Listener listener = qcm.listener;
-listener.start();
-FastLeaderElection fle = new FastLeaderElection(this,qcm);
-FastLeaderElection(QuorumPeer self,QuorumCnxManager manager)
-fle.start();
-QuorumCnxManager.java
-QuorumCnxManager
-1.this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
-2.this.queueSendMap = new ConcurrentHashMap<>();
-3.this.senderWorkerMap = new ConcurrentHashMap<>();
-4.this.lastMessageSent = new ConcurrentHashMap<>();
-5.initializeConnectionExecutor(mySid,quorumCnxnThreadsSize);
-6.listener = new Listener();
-Listener()
-run()
-listenerHandlers = addresses.stream().map(address->new ListenerHandler(address,self.shouldUsePortUnification(),self.isSslQuorum(),latch)).collect(Collectors.toList());
-run()
-acceptConnections();
-serverSocket = createNewServerSocket();
-while(!shutdown)
-client = serverSocket.accept();
-ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
-listenerHandlers.forEach(executor::submit);
-FastLeaderElection.java
-FastLeaderElection(QuorumPeer self,QuorumCnxManager manager)
-starter(self,manager);
-sendqueue = new LinkedBlockingQueue<ToSend>();
-recvqueue = new LinkedBlockingQueue<Notification>();
-this.messenger = new Messenger(manager);
-this.ws = new WorkerSender(manager);
-this.wr = new WorkerReceiver(manager);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
startLeaderElection();
public synchronized void startLeaderElection() {
try {
if(getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
this.electionAlg = createElectionAlgorithm(electionType);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
switch(electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if(oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<>();
this.senderWorkerMap = new ConcurrentHashMap<>();
this.lastMessageSent = new ConcurrentHashMap<>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if (cnxToValue != null) {
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
this.authServer = authServer;
this.authLearner = authLearner;
this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);
listener = new Listener();
listener.setName("QuorumPeerListener");
}
private void initializeConnectionExecutor(final long mySid, final int quorumCnxnThreadsSize) {
final AtomicInteger threadIndex = new AtomicInteger(1);
SecurityManager s = System.getSecurityManager();
final ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
final ThreadFactory daemonThFactory = runnable -> new Thread(group, runnable,
String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, threadIndex.getAndIncrement()));
this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), daemonThFactory);
this.connectionExecutor.allowCoreThreadTimeOut(true);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79