点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之Dispatcher启动
目录
在上一章中,我们分析了主节点(逻辑JobManager)的启动大致流程,在这一章中我们来看看源码中WebMonitorEndpoint服务是如何构建并启动的。
我们先来复习一下上节中分析的JobManager的几个重要概念:
关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。
JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:
ResourceManager:
Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责
Dispatcher:
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,蕾西与Yarn中的AppMaster和Spark中的Driver。
2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
WebMonitorEndpoint:
Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理
用一个例子来描述这三个组件的功能:
当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。
在这里,我们再来介绍一下WebMonitorEndpoint的主要功能:
WebMonitorEndpoint里面维护了很多很多的Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink 集群,最终, 是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理
好了,了解了WebMonitorEndpoint的功能后,我们来看看WebMonitorEndpoint的构建和启动源码。
紧接上一章内容,在工厂类创建完成后,首先创建的实例为WebMonitorEndpoint实例,下面我们俩看看WebMonitorEndpoint是如何启动的
点击这里查看上一节内容:Flink 1.13 源码解析——JobManager启动流程概览
首先来看DefaultDispatcherResourceManagerComponentFactory的create方法中WebMonitorEndpoint的构建和启动代码
- /*
- TODO 创建WebMonitorEndpoint实例,在Standalone模式下为:DispatcherRestEndpoint
- 该实例内部会启动一个Netty服务端,绑定了一堆Handler
- */
- webMonitorEndpoint =
- restEndpointFactory.createRestEndpoint(
- configuration,
- dispatcherGatewayRetriever,
- resourceManagerGatewayRetriever,
- blobServer,
- executor,
- metricFetcher,
- highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
- fatalErrorHandler);
- // TODO 启动WebMonitorEndpoint
- log.debug("Starting Dispatcher REST endpoint.");
- webMonitorEndpoint.start();
我们进入到webMonitorEndpoint.start()方法内,这个方法里面内容比较多,我们这里来逐一分析
- // TODO 路由器,解析请求寻找对应Handler
- final Router router = new Router();
- final CompletableFuture
restAddressFuture = new CompletableFuture<>( - // TODO 初始化Handlers
- handlers = initializeHandlers(restAddressFuture);
- /* sort the handlers such that they are ordered the following:
- * /jobs
- * /jobs/overview
- * /jobs/:jobid
- * /jobs/:jobid/config
- * /:*
- *
- * TODO 排序,为了确认URL和Handler的一一对应的关系,不应出现一对多的情况
- */
- Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
- // TODO 确认唯一性方法
- checkAllEndpointsAndHandlersAreUnique(handlers);
- // TODO 注册Handler
- handlers.forEach(handler -> registerHandler(router, handler, log));
首先启动NettyServer端引导操作
- // TODO 启动Netty Server 端引导程序
- NioEventLoopGroup bossGroup =
- new NioEventLoopGroup(
- 1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
- NioEventLoopGroup workerGroup =
- new NioEventLoopGroup(
- 0, new ExecutorThreadFactory("flink-rest-server-netty-worker"))
- bootstrap = new ServerBootstrap();
- bootstrap
- .group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(initializer);
- Iterator
portsIterator; - try {
- portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
- } catch (IllegalConfigurationException e) {
- throw e;
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "Invalid port range definition: " + restBindPortRange);
- }
这里有一段代码很有意思,Flink为了解决端口冲突问题做了以下操作
- // 此处为防止端口冲突,将逐一尝试端口是否可用
- int chosenPort = 0;
- while (portsIterator.hasNext()) {
- try {
- chosenPort = portsIterator.next();
- final ChannelFuture channel;
- if (restBindAddress == null) {
- channel = bootstrap.bind(chosenPort);
- } else {
- channel = bootstrap.bind(restBindAddress, chosenPort);
- }
- serverChannel = channel.syncUninterruptibly().channel();
- break;
- } catch (final Exception e) {
- // continue if the exception is due to the port being in use, fail early
- // otherwise
- if (!(e instanceof org.jboss.netty.channel.ChannelException
- || e instanceof java.net.BindException)) {
- throw e;
- }
- }
- }
- if (serverChannel == null) {
- throw new BindException(
- "Could not start rest endpoint on any port in port range "
- + restBindPortRange);
- }
在方法末尾修改一下EndPoint状态为RUNNING,到这里WebMonitorEndpoint的Netty服务就启动完毕了,接下来通过startInternal()方法进行其他基础服务的启动
- // 修改状态
- state = State.RUNNING;
- //TODO 到此为止,WebMonitorEndpoint的netty服务端就启动好了
- // TODO 启动其他基础服务
- startInternal();
接下来,我们看看startInternal做了什么工作
在startInternal方法内部我们可以看到,WebMonitorEndpoint准备开始进行Leader竞选
- @Override
- public void startInternal() throws Exception {
- // TODO 开始选举,去找回调方法
- leaderElectionService.start(this);
- startExecutionGraphCacheCleanupTask();
-
- if (hasWebUI) {
- log.info("Web frontend listening at {}.", getRestBaseUrl());
- }
- }
在往下看之前,这里要介绍一下主节点内部组件的的Leader选举机制
Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。有了这个概念,我们再来看WebMonitorEndpoint的选举,我们进入start方法。这里我们选择DefaultLeaderElectionService实现
- @Override
- public final void start(LeaderContender contender) throws Exception {
- checkNotNull(contender, "Contender must not be null.");
- Preconditions.checkState(leaderContender == null, "Contender was already set.");
-
- synchronized (lock) {
- /*
- TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
- 在ResourceManager中调用时,contender为ResourceManager
- 在DispatcherRunner中调用时,contender为DispatcherRunner
- */
- leaderContender = contender;
-
- // TODO 此处创建选举对象 leaderElectionDriver
- leaderElectionDriver =
- leaderElectionDriverFactory.createLeaderElectionDriver(
- this,
- new LeaderElectionFatalErrorHandler(),
- leaderContender.getDescription());
- LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
-
- running = true;
- }
- }
在Standalone模式下,WebMonitorEndpoint、ResourceManager、DispatcherRunner都会使用该模式来进行竞选,此刻我们是从WebMonitorEndpoint进入的此方法,此时contender对象实际为DispatcherRestEndpoint。我们继续看leaderElectionDriver的构建。
接下来进入createLeaderElectionDriver方法内,由于我们是Standalone模式,我们选择ZooKeeperLeaderElectionDriverFactory实现
- @Override
- public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
- LeaderElectionEventHandler leaderEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- return new ZooKeeperLeaderElectionDriver(
- client,
- latchPath,
- leaderPath,
- leaderEventHandler,
- fatalErrorHandler,
- leaderContenderDescription);
- }
这里构建了一个zookeeper的leaderElectionDriver,我们点进来继续看
- /**
- * Creates a ZooKeeperLeaderElectionDriver object.
- *
- * @param client Client which is connected to the ZooKeeper quorum
- * @param latchPath ZooKeeper node path for the leader election latch
- * @param leaderPath ZooKeeper node path for the node which stores the current leader
- * information
- * @param leaderElectionEventHandler Event handler for processing leader change events
- * @param fatalErrorHandler Fatal error handler
- * @param leaderContenderDescription Leader contender description
- */
- public ZooKeeperLeaderElectionDriver(
- CuratorFramework client,
- String latchPath,
- String leaderPath,
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- this.client = checkNotNull(client);
- this.leaderPath = checkNotNull(leaderPath);
- this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
- this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
- this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
-
- leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
- cache = new NodeCache(client, leaderPath);
-
- client.getUnhandledErrorListenable().addListener(this);
-
- running = true;
-
- // TODO 开始选举
- leaderLatch.addListener(this);
- leaderLatch.start();
-
- /*
- TODO 选举开始后,不就会接收到响应:
- 1.如果竞选成功,则回调该类的isLeader方法
- 2.如果竞选失败,则回调该类的notLeader方法
- 每一个竞选者对应一个竞选Driver
- */
-
- cache.getListenable().addListener(this);
- cache.start();
-
- client.getConnectionStateListenable().addListener(listener);
- }
在这里,通过start方法开始进行选举,正如我们上面所说,在选举完成和会调用回调方法,我们去看该类的isLeader方法
- /*
- 选举成功
- */
- @Override
- public void isLeader() {
- leaderElectionEventHandler.onGrantLeadership();
- }
再进入onGrantLeadership方法
- @Override
- @GuardedBy("lock")
- public void onGrantLeadership() {
- synchronized (lock) {
- if (running) {
- issuedLeaderSessionID = UUID.randomUUID();
- clearConfirmedLeaderInformation();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getDescription(),
- issuedLeaderSessionID);
- }
-
- /*
- TODO 有4中竞选者类型,LeaderContender有4中情况
- 1.Dispatcher = DefaultDispatcherRunner
- 2.JobMaster = JobManagerRunnerImpl
- 3.ResourceManager = ResourceManager
- 4.WebMonitorEndpoint = WebMonitorEndpoint
- */
- leaderContender.grantLeadership(issuedLeaderSessionID);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Ignoring the grant leadership notification since the {} has "
- + "already been closed.",
- leaderElectionDriver);
- }
- }
- }
- }
我们再进入leaderContender.grantLeadership方法,因为当前是WebMonitorEndpoint的选举,所以我们进入WebMonitorEndpoint的实现
- @Override
- public void grantLeadership(final UUID leaderSessionID) {
- log.info(
- "{} was granted leadership with leaderSessionID={}",
- getRestBaseUrl(),
- leaderSessionID);
- leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
- }
没什么好说的,我们再进入leaderElectionService.confirmLeadership方法,选择DefaultLeaderElectionService实现
- @Override
- public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress);
- }
- checkNotNull(leaderSessionID);
- synchronized (lock) {
- if (hasLeadership(leaderSessionID)) {
- if (running) {
- // TODO 确认Leader信息,并将节点信息写入zk
- confirmLeaderInformation(leaderSessionID, leaderAddress);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Ignoring the leader session Id {} confirmation, since the "
- + "LeaderElectionService has already been stopped.",
- leaderSessionID);
- }
- }
- } else {
- // Received an old confirmation call
- if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Receive an old confirmation call of leader session ID {}, "
- + "current issued session ID is {}",
- leaderSessionID,
- issuedLeaderSessionID);
- }
- } else {
- LOG.warn(
- "The leader session ID {} was confirmed even though the "
- + "corresponding JobManager was not elected as the leader.",
- leaderSessionID);
- }
- }
- }
- }
也没有什么内容,我们直接进入confirmLeaderInformation方法里
- @GuardedBy("lock")
- private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
- confirmedLeaderSessionID = leaderSessionID;
- confirmedLeaderAddress = leaderAddress;
- leaderElectionDriver.writeLeaderInformation(
- LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
- }
可以看到,WebMonitorEndpoint在选举Leader成功后,并没有做什么,只是将自己的信息写入zookeeper。
在信息写入完毕后,WebMonitorEndpoint就算是启动完成了。
WebMonitorEndpoint的启动流程并不复杂,总结一下就是做了以下这些工作:
在下一篇中,我们继续来看ResourceManager的启动流程