| 角色 | 说明 |
|---|---|
| LeaderContender(竞选者) | 需要选主的主体,比如dispatcher、resource manager。当选leader或者回收leader时进行回调 |
| LeaderElectionService(竞选服务) | 负责选举的服务。a.关联leaderElectionDriver实现,具体驱动实现可以是zk、k8s等;在启动时初始化驱动类;b.实现LeaderElectionEventHandler调用 |
| LeaderElectionDriver(竞选服务驱动) | 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现LeaderLatchListener回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler |
| LeaderElectionEventHandler(竞选服务的事件处理类) | leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap |
| LeaderRetrievalService(leader监听服务) | 监听leader的变化,传递给监听者。实现与LeaderElectionService类似,但作用不同。主要是接收Leader信息然后通知监听者LeaderRetrievalListener,监听者 |
| LeaderRetriever(监听者) | 监听leader的监听者,实现了LeaderRetrievalListener接口。具体实现为RpcGatewayRetriever。RpcGatewayRetriever在回调中可以获取到leader的信息,创建akka的连接,生成aop代理类实例 |
| RpcGateway | Rcp网关,实现类通过AOP的方式封装了akka层的细节,可以直接调用实现类方法实现akka通信 |
其中 LeaderContender 接口主要在 leader 选举中使用,代表了参与leader竞争的角色,
其实现类有
该接口中包含了两个重要的方法:
1. grantLeadership,表示leader竞选成功的回调方法
2. revokeLeadership,表示由leader变为非leader的回调方法
一个 服务需要进行选举, 在启动时,将自身作为竞争者,传递给了 leaderElectionService。
@Override
public void start() throws Exception {
LOG.debug("Start leadership runner for job {}.", getJobID());
leaderElectionService.start(this);
}
Leader选举服务 ,以其子类 DefaultLeaderElectionService 为例

LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:

leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap

在开始参加 Leader 时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个 leaderElectionDriver,通过这个Driver工厂类,Flink 将基于 zookeeper 的 CuratorFramework 的细节,与 Flink 本身做了解耦
并将自身作为一个 LeaderElectionEventHandler 传入leaderElectionDriver。
@Override
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock) {
running = true;
leaderContender = contender;
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
}
}
LeaderElectionDriver 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现 LeaderLatchListener 回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler
public interface LeaderLatchListener {
// 选举成功回调方法
void isLeader();
void notLeader();
}

通过工厂创建
// 通过工厂创建
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // leaderElectionService 对象 作为 leaderEventHandler
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
}
ZooKeeperLeaderElectionDriver 封装了 CuratorFramework 作为选举框架
/**
* Creates a ZooKeeperLeaderElectionDriver object.
*
* @param client Client which is connected to the ZooKeeper quorum
* @param path ZooKeeper node path for the leader election
* @param leaderElectionEventHandler Event handler for processing leader change events
* @param fatalErrorHandler Fatal error handler
* @param leaderContenderDescription Leader contender description
*/
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
checkNotNull(path);
this.client = checkNotNull(client);
this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch = new LeaderLatch(client, leaderLatchPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);
running = true;
// 启动选举
leaderLatch.addListener(this); // 添加 LeaderLatchListener
leaderLatch.start();
cache.start();
client.getConnectionStateListenable().addListener(listener);
}
// 创建 leaderLatch
leaderLatch = new LeaderLatch(client, leaderLatchPath);
设置监听
Curator 相关监听 API 封装 zookeeper 原生API,内部增加重复注册等功能,从而使监听可以重复使用。
Curator 存在三种类型 API。
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);
private void retrieveLeaderInformationFromZooKeeper() throws Exception {
if (leaderLatch.hasLeadership()) {
ChildData childData = cache.getCurrentData(connectionInformationPath);
// 回调 onLeaderInformationChange leaderElectionEvnetHandler 是 DefaultLeaderElectionService
leaderElectionEventHandler.onLeaderInformationChange(
childData == null
? LeaderInformation.empty()
: ZooKeeperUtils.readLeaderInformation(childData.getData()));
}
}
回调 onLeaderInformationChange, leaderElectionEvnetHandler 是 DefaultLeaderElectionService
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onLeaderInformationChange
@Override
@GuardedBy("lock")
public void onLeaderInformationChange(LeaderInformation leaderInformation) {
synchronized (lock) {
if (running) {
if (!confirmedLeaderInformation.isEmpty()) {
final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
if (leaderInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information by {} since the external storage is empty.",
leaderContender.getDescription());
}
// 主要逻辑是将 leaderinfo 写入 zookeeper
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
} else if (!leaderInformation.equals(confirmedLeaderInfo)) {
// the data field does not correspond to the expected leader information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Correcting leader information by {}.",
leaderContender.getDescription());
}
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
}
}
}
}
}
leaderinfo 写入 zookeeper
/** Writes the current leader's address as well the given leader session ID to ZooKeeper. */
@Override
public void writeLeaderInformation(LeaderInformation leaderInformation) {
assert (running);
// this method does not have to be synchronized because the curator framework client
// is thread-safe. We do not write the empty data to ZooKeeper here. Because
// check-leadership-and-update
// is not a transactional operation. We may wrongly clear the data written by new leader.
if (leaderInformation.isEmpty()) {
return;
}
try {
ZooKeeperUtils.writeLeaderInformationToZooKeeper(
leaderInformation,
client,
leaderLatch::hasLeadership,
connectionInformationPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully wrote leader information: {}.", leaderInformation);
}
} catch (Exception e) {
fatalErrorHandler.onFatalError(
new LeaderElectionException(
"Could not write leader address and leader session ID to "
+ "ZooKeeper.",
e));
}
}
// 将 ZooKeeperLeaderElectionDriver 添加到 leaderLatch 的 listeners 中
leaderLatch.addListener(this);
leaderLatch.start();
// org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch#start
public void start() throws Exception {
Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() {
public void run() {
try {
LeaderLatch.this.internalStart();
} finally {
LeaderLatch.this.startTask.set((Object)null);
}
}
}));
}
public class AfterConnectionEstablished {
private static final Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);
public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception {
final ExecutorService executor =
ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
Runnable internalCall = new Runnable() {
public void run() {
try {
client.blockUntilConnected();
runAfterConnection.run();
} catch (Exception var5) {
ThreadUtils.checkInterrupted(var5);
AfterConnectionEstablished.log.error("An error occurred blocking until a connection is available", var5);
} finally {
executor.shutdown();
}
}
};
return executor.submit(internalCall);
}
private AfterConnectionEstablished() {
}
}
调用 internalStart
private synchronized void internalStart() {
if (this.state.get() == LeaderLatch.State.STARTED) {
this.client.getConnectionStateListenable().addListener(this.listener);
try {
// 主要逻辑
this.reset();
} catch (Exception var2) {
ThreadUtils.checkInterrupted(var2);
this.log.error("An error occurred checking resetting leadership.", var2);
}
}
}
reset
异步接口
Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
@VisibleForTesting
void reset() throws Exception {
// 领导权初始化
this.setLeadership(false);
this.setNode((String)null);
BackgroundCallback callback = new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (LeaderLatch.this.debugResetWaitLatch != null) {
LeaderLatch.this.debugResetWaitLatch.await();
LeaderLatch.this.debugResetWaitLatch = null;
}
if (event.getResultCode() == Code.OK.intValue()) {
LeaderLatch.this.setNode(event.getName());
if (LeaderLatch.this.state.get() == LeaderLatch.State.CLOSED) {
LeaderLatch.this.setNode((String)null);
} else {
// 主要逻辑 主要逻辑 主要逻辑
LeaderLatch.this.getChildren();
}
} else {
LeaderLatch.this.log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
((ErrorListenerPathAndBytesable)((ACLBackgroundPathAndBytesable)this.client.create()
.creatingParentContainersIfNeeded()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL))
.inBackground(callback))
.forPath(ZKPaths.makePath(this.latchPath, "latch-"), LeaderSelector.getIdBytes(this.id));
}
接下来我们来看 主要逻辑 LeaderLatch.this.getChildren();
private void getChildren() throws Exception {
BackgroundCallback callback = new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == Code.OK.intValue()) {
// 校验 领导权
LeaderLatch.this.checkLeadership(event.getChildren());
}
}
};
((ErrorListenerPathable)this.client.getChildren()
.inBackground(callback))
.forPath(ZKPaths.makePath(this.latchPath, (String)null));
}
进入 checkLeadership, 我们关注 this.setLeadership(true);
private void checkLeadership(List<String> children) throws Exception {
if (this.debugCheckLeaderShipLatch != null) {
this.debugCheckLeaderShipLatch.await();
}
final String localOurPath = (String)this.ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren("latch-", sorter, children);
int ourIndex = localOurPath != null ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if (ourIndex < 0) {
this.log.error("Can't find our node. Resetting. Index: " + ourIndex);
this.reset();
} else if (ourIndex == 0) {
this.setLeadership(true);
} else {
String watchPath = (String)sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher() {
public void process(WatchedEvent event) {
if (LeaderLatch.this.state.get() == LeaderLatch.State.STARTED && event.getType() == EventType.NodeDeleted && localOurPath != null) {
try {
LeaderLatch.this.getChildren();
} catch (Exception var3) {
ThreadUtils.checkInterrupted(var3);
LeaderLatch.this.log.error("An error occurred checking the leadership.", var3);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == Code.NONODE.intValue()) {
LeaderLatch.this.reset();
}
}
};
((ErrorListenerPathable)((BackgroundPathable)this.client.getData().usingWatcher(watcher)).inBackground(callback)).forPath(ZKPaths.makePath(this.latchPath, watchPath));
}
}
private synchronized void setLeadership(boolean newValue) {
boolean oldValue = this.hasLeadership.getAndSet(newValue);
if (oldValue && !newValue) {
// ZooKeeperLeaderElectionDriver
this.listeners.forEach(LeaderLatchListener::notLeader);
} else if (!oldValue && newValue) {
this.listeners.forEach(LeaderLatchListener::isLeader);
}
this.notifyAll();
}
回调实现
// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#isLeader
@Override
public void isLeader() {
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start 中
// DefaultLeaderElectionService 将 自己作为 一个 leaderElectionEventHandler
// 所以此处选举成功, 通过回调 通知 DefaultLeaderElectionService 选举leader
leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
}
@Override
public void notLeader() {
leaderElectionEventHandler.onRevokeLeadership();
}
DefaultLeaderElectionService 响应选举成功,回调 leaderContender.grantLeadership:
@Override
@GuardedBy("lock")
public void onGrantLeadership(UUID newLeaderSessionId) {
synchronized (lock) {
if (running) {
issuedLeaderSessionID = newLeaderSessionId;
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);
}
// 主要逻辑 leaderContender,
leaderContender.grantLeadership(issuedLeaderSessionID);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ignoring the grant leadership notification since the {} has "
+ "already been closed.",
leaderElectionDriver);
}
}
}
}
以 WebMonitorEndpoint 为例
@Override
public void grantLeadership(final UUID leaderSessionID) {
log.info(
"{} was granted leadership with leaderSessionID={}",
getRestBaseUrl(),
leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
至此一个完整的选举流程结束
参考:
https://www.modb.pro/db/107324
https://blog.csdn.net/qq_44836294/article/details/108022739