点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
目录
在之前的章节里,我们分析了Flink主节点(逻辑JobManager)的启动过程,包括了8个基础环境的创建,核心实例工厂类的创建,以及通过工厂类构建并启动WebMonitorEndpoint、ResourceManager的过程,在这一节中我们来看最后的一部分Dispatcher的启动流程,当然在此之前还是先来复习一下JobManager的一些重要概念以及Dispatcher组件的功能是什么。
关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。
JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:
ResourceManager:
Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责
Dispatcher:
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。
2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
WebMonitorEndpoint:
Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理
用一个例子来描述这三个组件的功能:
当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。
Dispatcher的初始化构成与之前的WebMonitorEndpoint和ResourceManager稍有不同,在构建核心工厂类后,Dispatcher并没有像WebMonitorEndpoint和ResourceManager一样直接构建实例,而是构建了一个DispatcherRunner,并在内部构建了Dispatcher实例并启动。我们来看它是如何实现的,首先还是来到dispatcherResourceManagerComponentFactory.create()方法:
- /*
- TODO 在该代码的内部会创建Dispatcher组件,并调用start() 方法启动
- */
- dispatcherRunner =
- dispatcherRunnerFactory.createDispatcherRunner(
- highAvailabilityServices.getDispatcherLeaderElectionService(),
- fatalErrorHandler,
- new HaServicesJobGraphStoreFactory(highAvailabilityServices),
- ioExecutor,
- rpcService,
- partialDispatcherServices);
可以看到,这里并没有构建Dispatcher,也没有启动Dispatcher,我们进入createDispatcherRunner方法
- @Override
- public DispatcherRunner createDispatcherRunner(
- LeaderElectionService leaderElectionService,
- FatalErrorHandler fatalErrorHandler,
- JobGraphStoreFactory jobGraphStoreFactory,
- Executor ioExecutor,
- RpcService rpcService,
- PartialDispatcherServices partialDispatcherServices)
- throws Exception {
-
- final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
- dispatcherLeaderProcessFactoryFactory.createFactory(
- jobGraphStoreFactory,
- ioExecutor,
- rpcService,
- partialDispatcherServices,
- fatalErrorHandler);
-
- // TODO
- return DefaultDispatcherRunner.create(
- leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
- }
根据变量名,我们可以看出在这里构建了一个Dispatcher的Leader竞选线程工厂,并将该对象作为参数传入了DispatcherRunner的构建方法里,我们进入DefaultDispatcherRunner.create方法:
- public static DispatcherRunner create(
- LeaderElectionService leaderElectionService,
- FatalErrorHandler fatalErrorHandler,
- DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
- throws Exception {
- final DefaultDispatcherRunner dispatcherRunner =
- new DefaultDispatcherRunner(
- leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
- // TODO 进入此方法
- return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
- dispatcherRunner, leaderElectionService);
- }
在构建了DispatcherRunner之后,将该实例传入了DispatcherRunner竞选Leader的生命周期管理方法,我们进入DispatcherRunnerLeaderElectionLifecycleManager.createFor方法继续分析
- public static
extends DispatcherRunner & LeaderContender> DispatcherRunner createFor( - T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
- // TODO 来看构造方法
- return new DispatcherRunnerLeaderElectionLifecycleManager<>(
- dispatcherRunner, leaderElectionService);
- }
继续进入DispatcherRunnerLeaderElectionLifecycleManager的构造方法:
- private DispatcherRunnerLeaderElectionLifecycleManager(
- T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
- this.dispatcherRunner = dispatcherRunner;
- this.leaderElectionService = leaderElectionService;
-
- // TODO 开始竞选,竞选者为 dispatcherRunner
- leaderElectionService.start(dispatcherRunner);
- }
又看到了我们熟悉的方法,开始Leader竞选!
我们进入start方法,选择DefaultLeaderElectionService实现:
- @Override
- public final void start(LeaderContender contender) throws Exception {
- checkNotNull(contender, "Contender must not be null.");
- Preconditions.checkState(leaderContender == null, "Contender was already set.");
-
- synchronized (lock) {
- /*
- TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
- 在ResourceManager中调用时,contender为ResourceManager
- 在DispatcherRunner中调用时,contender为DispatcherRunner
- */
- leaderContender = contender;
-
- // TODO 此处创建选举对象 leaderElectionDriver
- leaderElectionDriver =
- leaderElectionDriverFactory.createLeaderElectionDriver(
- this,
- new LeaderElectionFatalErrorHandler(),
- leaderContender.getDescription());
- LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
-
- running = true;
- }
- }
又是熟悉的方法,在前两章中,ResourceManager、WebMonitorEndpoint组件的Leader竞选都使用的该方法,此处是DispatcherRunner的竞选,所以此处的contender为DispatcherRunner,我们继续看竞选流程,进入leaderElectionDriverFactory.createLeaderElectionDriver方法,由于是基于standalone模式分析源码,Leader的竞选依赖于zookeeper,我们进入ZooKeeperLeaderElectionDriverFactory实现:
- @Override
- public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
- LeaderElectionEventHandler leaderEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- return new ZooKeeperLeaderElectionDriver(
- client,
- latchPath,
- leaderPath,
- leaderEventHandler,
- fatalErrorHandler,
- leaderContenderDescription);
- }
再进入ZooKeeperLeaderElectionDriver的构造方法:
- public ZooKeeperLeaderElectionDriver(
- CuratorFramework client,
- String latchPath,
- String leaderPath,
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- this.client = checkNotNull(client);
- this.leaderPath = checkNotNull(leaderPath);
- this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
- this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
- this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
-
- leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
- cache = new NodeCache(client, leaderPath);
-
- client.getUnhandledErrorListenable().addListener(this);
-
- running = true;
-
- // TODO 开始选举
- leaderLatch.addListener(this);
- leaderLatch.start();
-
- /*
- TODO 选举开始后,不就会接收到响应:
- 1.如果竞选成功,则回调该类的isLeader方法
- 2.如果竞选失败,则回调该类的notLeader方法
- 每一个竞选者对应一个竞选Driver
- */
-
- cache.getListenable().addListener(this);
- cache.start();
-
- client.getConnectionStateListenable().addListener(listener);
- }
又是熟悉的地方,根据前两章的分析,Leader竞选完成后会根据竞选结果回调isLeader方法或notLeader方法,此处我们直接去看isLeader方法:
- /*
- 选举成功
- */
- @Override
- public void isLeader() {
- leaderElectionEventHandler.onGrantLeadership();
- }
在点进来:
- @Override
- @GuardedBy("lock")
- public void onGrantLeadership() {
- synchronized (lock) {
- if (running) {
- issuedLeaderSessionID = UUID.randomUUID();
- clearConfirmedLeaderInformation();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getDescription(),
- issuedLeaderSessionID);
- }
-
- /*
- TODO 有4中竞选者类型,LeaderContender有4中情况
- 1.Dispatcher = DefaultDispatcherRunner
- 2.JobMaster = JobManagerRunnerImpl
- 3.ResourceManager = ResourceManager
- 4.WebMonitorEndpoint = WebMonitorEndpoint
- */
- leaderContender.grantLeadership(issuedLeaderSessionID);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Ignoring the grant leadership notification since the {} has "
- + "already been closed.",
- leaderElectionDriver);
- }
- }
- }
- }
再进入leaderContender.grantLeadership方法,由于当前是DispatcherRunner的选举,我们选择DefaultDispatcherRunner实现:
- // ---------------------------------------------------------------
- // Leader election
- // ---------------------------------------------------------------
-
- @Override
- public void grantLeadership(UUID leaderSessionID) {
- runActionIfRunning(
- () -> {
- LOG.info(
- "{} was granted leadership with leader id {}. Creating new {}.",
- getClass().getSimpleName(),
- leaderSessionID,
- DispatcherLeaderProcess.class.getSimpleName());
- // TODO
- startNewDispatcherLeaderProcess(leaderSessionID);
- });
- }
根据方法名不难猜出,接下来是启动一个新的DispatcherLeader,我们进入startNewDispatcherLeaderProcess方法:
- private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
- // TODO 如果当前有DispatcherLeader则先关闭
- stopDispatcherLeaderProcess();
-
- // TODO 然后再创建
- dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
-
- final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
- FutureUtils.assertNoException(
- previousDispatcherLeaderProcessTerminationFuture.thenRun(
- // TODO 启动
- newDispatcherLeaderProcess::start));
- }
在该方法里一共做了三件事:
1、先判断当前是否有正在运行的DispatcherLeader,如果有则先关闭,保证当前环境中只有一个且是最新的DispatcherLeader。
2、然后再创建DispatcherLeader
3、启动DispatcherLeader
我们来看newDispatcherLeaderProcess的start方法:
- @Override
- public final void start() {
- // TODO
- runIfStateIs(State.CREATED, this::startInternal);
- }
-
- private void startInternal() {
- log.info("Start {}.", getClass().getSimpleName());
- state = State.RUNNING;
- // TODO
- onStart();
- }
再来看startInternal的onStart方法,选择SessionDispatcherLeaderProcess实现:
- @Override
- protected void onStart() {
- // TODO 启动Dispatcher服务,启动JobGraphStore
- startServices();
-
- // TODO 异步编程, 若JobGraphStore启动后发现内部有未执行完毕的Job,则先通过recoverJobsAsync恢复JobGraph
- // TODO 再用过createDispatcherIfRunning启动Dispatcher
- onGoingRecoveryOperation =
- recoverJobsAsync()
- // TODO 构建Dispatcher并启动
- .thenAccept(this::createDispatcherIfRunning)
- .handle(this::onErrorIfRunning);
- }
-
在这个方法里一共做了三件事:
1、启动Dispatcher所需的基础服务,启动JobGraphStore
2、恢复之前因为非正常原因没有执行完的Job
3、构建并启动Dispatcher
下面我们来详细聊聊这几个部分
我们先来看JobGraphStore的启动,进入startServices方法:
- private void startServices() {
- try {
- // TODO 启动JobGraphStore
- jobGraphStore.start(this);
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format(
- "Could not start %s when trying to start the %s.",
- jobGraphStore.getClass().getSimpleName(), getClass().getSimpleName()),
- e);
- }
- }
进入start方法,选择DefaultJobGraphStore实现:
- @Override
- public void start(JobGraphListener jobGraphListener) throws Exception {
- synchronized (lock) {
- if (!running) {
- // TODO 启动监听
- // TODO 此处的监听,若有JobGraph添加则会回调 onAddedJobGraph方法
- // TODO 若有JobGraph删除则会回调 onRemovedJobGraph 方法
- this.jobGraphListener = checkNotNull(jobGraphListener);
- jobGraphStoreWatcher.start(this);
- running = true;
- }
- }
- }
可以看到此处启动了一个JobGraph的监听服务,当有JobGraph提交进来时会触发onAddedJobGraph方法,当有JobGraph移除时会回调onRemovedJobGraph方法,详细内容我们会在后续的Job提交源码分析力介绍。现在我们回到之前的onStart方法
若JobGraphStore启动后发现内部有未执行完毕的Job,在recoverJobsAsync()方法里会遍历这些Job并加入集合中:
- private Collection
recoverJobs() { - log.info("Recover all persisted job graphs.");
- final Collection
jobIds = getJobIds(); - final Collection
recoveredJobGraphs = new ArrayList<>(); -
- for (JobID jobId : jobIds) {
- recoveredJobGraphs.add(recoverJob(jobId));
- }
-
- log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size());
-
- return recoveredJobGraphs;
- }
在完成中断Job的恢复工作后,开始真正的构建Dispatcher实例,并启动,我们来看createDispatcherIfRunning方法:
- private void createDispatcherIfRunning(Collection
jobGraphs) { - runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
- }
再进入createDispatcher方法:
- private void createDispatcher(Collection
jobGraphs) { -
- final DispatcherGatewayService dispatcherService =
- // TODO 构建Dispatcher并启动
- dispatcherGatewayServiceFactory.create(
- DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);
-
- completeDispatcherSetup(dispatcherService);
- }
可以看到此处已经开始构建Dispatcher了,我们再点入create方法,选择DefaultDispatcherGatewayServiceFactory实现:
- @Override
- public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
- DispatcherId fencingToken,
- Collection
recoveredJobs, - JobGraphWriter jobGraphWriter) {
-
- final Dispatcher dispatcher;
- try {
- // TODO 构建Dispatcher
- dispatcher =
- dispatcherFactory.createDispatcher(
- rpcService,
- fencingToken,
- recoveredJobs,
- (dispatcherGateway, scheduledExecutor, errorHandler) ->
- new NoOpDispatcherBootstrap(),
- PartialDispatcherServicesWithJobGraphStore.from(
- partialDispatcherServices, jobGraphWriter));
- } catch (Exception e) {
- throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
- }
-
- // TODO 启动DIspatcher
- dispatcher.start();
-
- return DefaultDispatcherGatewayService.from(dispatcher);
- }
可以看到在这里真正构建了Dispatcher实例,并调用了start方法启动Dispatcher,我们先来看createDispatcher方法,选择SessionDispatcherFactory实现:
- @Override
- public StandaloneDispatcher createDispatcher(
- RpcService rpcService,
- DispatcherId fencingToken,
- Collection
recoveredJobs, - DispatcherBootstrapFactory dispatcherBootstrapFactory,
- PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)
- throws Exception {
- // create the default dispatcher
- // TODO 继承了RpcEndpoint,创建完成后会回调onStart方法
- return new StandaloneDispatcher(
- rpcService,
- fencingToken,
- recoveredJobs,
- dispatcherBootstrapFactory,
- DispatcherServices.from(
- partialDispatcherServicesWithJobGraphStore,
- JobMasterServiceLeadershipRunnerFactory.INSTANCE));
- }
我们再来看StandaloneDispatcher的构造方法:
- public class StandaloneDispatcher extends Dispatcher {
- public StandaloneDispatcher(
- RpcService rpcService,
- DispatcherId fencingToken,
- Collection
recoveredJobs, - DispatcherBootstrapFactory dispatcherBootstrapFactory,
- DispatcherServices dispatcherServices)
- throws Exception {
- super(
- rpcService,
- fencingToken,
- recoveredJobs,
- dispatcherBootstrapFactory,
- dispatcherServices);
- }
- }
再进入super,我们来到了Dispatcher类内部,因为Dispatcher继承了RpcEndpoint,根据我们在FlinkRPC章节讲到的内容,此刻我们知道在Dispatcher初始化之后会调用onStart方法,我们直接去看onStart方法:
- // ------------------------------------------------------
- // Lifecycle methods
- // ------------------------------------------------------
-
- @Override
- public void onStart() throws Exception {
- try {
- // TODO 启动Dispatcher基础服务
- startDispatcherServices();
- } catch (Throwable t) {
- final DispatcherException exception =
- new DispatcherException(
- String.format("Could not start the Dispatcher %s", getAddress()), t);
- onFatalError(exception);
- throw exception;
- }
-
- // TODO 启动待恢复的Job
- startRecoveredJobs();
- this.dispatcherBootstrap =
- this.dispatcherBootstrapFactory.create(
- getSelfGateway(DispatcherGateway.class),
- this.getRpcService().getScheduledExecutor(),
- this::onFatalError);
- }
这里做了三件事:
1、启动Dispatcher的基础服务
2、开始恢复之前添加到集合中的中断的Job
3、构建DIspatcher实例
在Dispatcher的基础服务中只启动了一个Metric服务,没什么好看的,我们来看中断Job的恢复:
- private void startRecoveredJobs() {
- for (JobGraph recoveredJob : recoveredJobs) {
- runRecoveredJob(recoveredJob);
- }
- recoveredJobs.clear();
- }
-
- private void runRecoveredJob(final JobGraph recoveredJob) {
- checkNotNull(recoveredJob);
- try {
- // TODO 以Recover模式运行Job
- // TODO 内部具体实现等后面分析作业提交流程时再来分析
- runJob(recoveredJob, ExecutionType.RECOVERY);
- } catch (Throwable throwable) {
- onFatalError(
- new DispatcherException(
- String.format(
- "Could not start recovered job %s.", recoveredJob.getJobID()),
- throwable));
- }
- }
我们可以看到,此处会遍历之前的中断Job集合,并对每一个中断Job以RECOVER模式恢复运行,具体的实现我们后面再来分析。我们继续来看Dispatcher的构建,回到之前的方法,我们俩看dispatcherBootstrapFactory.create,选择DefaultDispatcherGatewayServiceFactory,我们又回到了这里:
- @Override
- public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
- DispatcherId fencingToken,
- Collection
recoveredJobs, - JobGraphWriter jobGraphWriter) {
-
- final Dispatcher dispatcher;
- try {
- // TODO 构建Dispatcher
- dispatcher =
- dispatcherFactory.createDispatcher(
- rpcService,
- fencingToken,
- recoveredJobs,
- (dispatcherGateway, scheduledExecutor, errorHandler) ->
- new NoOpDispatcherBootstrap(),
- PartialDispatcherServicesWithJobGraphStore.from(
- partialDispatcherServices, jobGraphWriter));
- } catch (Exception e) {
- throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
- }
-
- // TODO 启动DIspatcher
- dispatcher.start();
-
- return DefaultDispatcherGatewayService.from(dispatcher);
- }
至此,Dispatcher实例已经构建完毕,接下来就是启动Dispatcher,在start方法里,Dispatcher向自己发送了一条消息,告知已启动完毕:
- @Override
- public void start() {
- // 向自己发送消息,告知已启动
- rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
- }
到此为止,Dispatcher服务已构建完毕也已启动完毕,我们总结一下。
Dispatcher的构建其实一共就做了两件事:
1、启动 JobGraphStore 服务
2、从 JobGraphStrore 恢复执行 Job, 要启动 Dispatcher
只不过Dispatcher的构建之前,Flink先构建了一个DispatcherRunner,并进行了Leader选举,选举完成之后才由LeaderDispatcherRunner构建Dispatcher并启动。在这里需要注意两点:
1、DispatcherRunner的选举环节会回调isLeader方法。
2、Dispatcher对象继承了RpcEndpoint,所以在构建完成后会调用onStart方法。
在前三章中,我们介绍了主节点(逻辑JobManager)的启动流程,以及8大基础服务的构建和启动,并且在前两章中我们介绍了WebMonitorEndpoint组件和ResourceManager组价你的启动,到此为止Dispatcher也已启动完毕,主节点也在这里完成了它所有的启动工作。在下一章中,我们来看看从节点TaskManager的启动流程!