点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动
点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之Dispatcher启动
目录
在开始解析ResourceManager之前,我们先来复习一下Flink主节点中的一些重要概念:
关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。
JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:
ResourceManager:
Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责
Dispatcher:
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。
2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
WebMonitorEndpoint:
Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理
用一个例子来描述这三个组件的功能:
当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。
ResourceManager在Flink中扮演的角色就是一个资源管理器,负责Slot的管理和申请等工作。
下面我们来看ResourceManager的启动代码。
首先回到dispatcherResourceManagerComponentFactory.create()方法,在完成了WebMonitorEndpoint的创建和启动之后,将进行ResourceManager的启动,我们来看代码:
- resourceManager =
- resourceManagerFactory.createResourceManager(
- configuration,
- ResourceID.generate(),
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- fatalErrorHandler,
- new ClusterInformation(hostname, blobServer.getPort()),
- webMonitorEndpoint.getRestBaseUrl(),
- metricRegistry,
- hostname,
- ioExecutor);
-
- resourceManager.start();
在这里首先初始化了ResourceManager实例,然后调用了start方法启动ResourceManager,我们来看start方法
- @Override
- public void start() {
- // 向自己发送消息,告知已启动
- rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
- }
ResourceManager是一个RpcEndpoint(Actor),在start方法里,知识是向自己发送了一条消息,告知已启动的状态。
那么我们回头去看ResourceManager的构建过程,我们去看resourceManagerFactory.createResourceManager方法。
- // TODO 构建ResourceManagerRuntimeServices,加载配置
- final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
- createResourceManagerRuntimeServices(
- effectiveResourceManagerAndRuntimeServicesConfig,
- rpcService,
- highAvailabilityServices,
- slotManagerMetricGroup);
-
- // TODO 构建ResourceManager
- return createResourceManager(
- getEffectiveConfigurationForResourceManager(
- effectiveResourceManagerAndRuntimeServicesConfig),
- resourceId,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- fatalErrorHandler,
- clusterInformation,
- webInterfaceUrl,
- resourceManagerMetricGroup,
- resourceManagerRuntimeServices,
- ioExecutor);
在方法里, 主要做了两件事:
1、构建ResourceManagerRuntimeServices并加载配置
2、真正构建ResourceManager
我们进入createResourceManager方法里,选择StandaloneResourceManagerFactory的实现,可以看到:
- @Override
- protected ResourceManager
createResourceManager( - Configuration configuration,
- ResourceID resourceId,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- FatalErrorHandler fatalErrorHandler,
- ClusterInformation clusterInformation,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices,
- Executor ioExecutor) {
-
- // TODO ResourceManager启动超时时间: 从启动,到有TaskManager汇报的时间,
- // TODO 可以通过resourcemanager.standalone.start-up-time进行设置,如果没有设置则默认等于Slot申请超时时间
- final Time standaloneClusterStartupPeriodTime =
- ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
-
- return new StandaloneResourceManager(
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- resourceManagerRuntimeServices.getSlotManager(),
- ResourceManagerPartitionTrackerImpl::new,
- resourceManagerRuntimeServices.getJobLeaderIdService(),
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup,
- standaloneClusterStartupPeriodTime,
- AkkaUtils.getTimeoutAsTime(configuration),
- ioExecutor);
- }
在这段代码里,主要完成了两件事:
1、配置ResourceManager启动超时时间,所谓超时时间,是指从ResourceManager启动,一直到有TaskManager向ResourceManager注册的时间长度,当超过配置的时间还没有TaskManager来向当前ResourceManager来注册,则认为当前ResourceManager启动超时,改参数可以通过resourcemanager.standalone.start-up-time进行设置,如果没有设置则默认等于Slot申请超时时间
2、构建了一个StandaloneResourceManager实例
我们继续看StandaloneResourceManager的构建过程,进入StandaloneResourceManager的构造方法,一直追溯到ResourceManager的构造方法,可以看到ResourceManager继承了RpcEndpoint,所以他一定有一个onStart方法,在构建完成之后会被回调,所以我们直接去找ResourceManager的onStart方法:
- // ------------------------------------------------------------------------
- // RPC lifecycle methods
- // ------------------------------------------------------------------------
-
- @Override
- public final void onStart() throws Exception {
- try {
- log.info("Starting the resource manager.");
- // TODO 启动ResourceManager 的基础服务
- startResourceManagerServices();
- } catch (Throwable t) {
- final ResourceManagerException exception =
- new ResourceManagerException(
- String.format("Could not start the ResourceManager %s", getAddress()),
- t);
- onFatalError(exception);
- throw exception;
- }
- }
可以看到,在方法里调用了startResourceManagerServices()方法来启动ResourceManager的基础服务,我们进入startResourceManagerServices()方法:
- private void startResourceManagerServices() throws Exception {
- try {
- // TODO 获取选举服务
- leaderElectionService =
- highAvailabilityServices.getResourceManagerLeaderElectionService();
- // 在Standalone模式下没有做任何操作
- initialize();
-
- // TODO 开始竞选
- leaderElectionService.start(this);
- jobLeaderIdService.start(new JobLeaderIdActionsImpl());
-
- registerMetrics();
- } catch (Exception e) {
- handleStartResourceManagerServicesException(e);
- }
- }
又看到了我们熟悉的操作,Leader竞选。
我们直接来看leaderElectionService.start方法
- @Override
- public final void start(LeaderContender contender) throws Exception {
- checkNotNull(contender, "Contender must not be null.");
- Preconditions.checkState(leaderContender == null, "Contender was already set.");
-
- synchronized (lock) {
- /*
- TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
- 在ResourceManager中调用时,contender为ResourceManager
- 在DispatcherRunner中调用时,contender为DispatcherRunner
- */
- leaderContender = contender;
-
- // TODO 此处创建选举对象 leaderElectionDriver
- leaderElectionDriver =
- leaderElectionDriverFactory.createLeaderElectionDriver(
- this,
- new LeaderElectionFatalErrorHandler(),
- leaderContender.getDescription());
- LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
-
- running = true;
- }
- }
哦豁,这不就是上一章WebMonitorEndpoint启动时调用过的方法嘛,只不过由于此处是ResourceManager的选举,当前contender为ResourceManager。老规矩,我们直接去看createLeaderElectionDriver方法。由于是Standalone模式,我们选择ZooKeeperLeaderElectionDriverFactory的实现:
- @Override
- public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
- LeaderElectionEventHandler leaderEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- return new ZooKeeperLeaderElectionDriver(
- client,
- latchPath,
- leaderPath,
- leaderEventHandler,
- fatalErrorHandler,
- leaderContenderDescription);
- }
可以看到这里返回了一个zk的选举驱动,我们在点进ZooKeeperLeaderElectionDriver类
- public ZooKeeperLeaderElectionDriver(
- CuratorFramework client,
- String latchPath,
- String leaderPath,
- LeaderElectionEventHandler leaderElectionEventHandler,
- FatalErrorHandler fatalErrorHandler,
- String leaderContenderDescription)
- throws Exception {
- this.client = checkNotNull(client);
- this.leaderPath = checkNotNull(leaderPath);
- this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
- this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
- this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
-
- leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
- cache = new NodeCache(client, leaderPath);
-
- client.getUnhandledErrorListenable().addListener(this);
-
- running = true;
-
- // TODO 开始选举
- leaderLatch.addListener(this);
- leaderLatch.start();
-
- /*
- TODO 选举开始后,不就会接收到响应:
- 1.如果竞选成功,则回调该类的isLeader方法
- 2.如果竞选失败,则回调该类的notLeader方法
- 每一个竞选者对应一个竞选Driver
- */
-
- cache.getListenable().addListener(this);
- cache.start();
-
- client.getConnectionStateListenable().addListener(listener);
- }
又来到了这个选举方法,在上节里我们讲到,在完成选举之后会回调isLeader方法或notLeader方法,我们这里直接去看isLeader方法
- /*
- 选举成功
- */
- @Override
- public void isLeader() {
- leaderElectionEventHandler.onGrantLeadership();
- }
在进入leaderElectionEventHandler.onGrantLeadership()方法:
- @Override
- @GuardedBy("lock")
- public void onGrantLeadership() {
- synchronized (lock) {
- if (running) {
- issuedLeaderSessionID = UUID.randomUUID();
- clearConfirmedLeaderInformation();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getDescription(),
- issuedLeaderSessionID);
- }
-
- /*
- TODO 有4中竞选者类型,LeaderContender有4中情况
- 1.Dispatcher = DefaultDispatcherRunner
- 2.JobMaster = JobManagerRunnerImpl
- 3.ResourceManager = ResourceManager
- 4.WebMonitorEndpoint = WebMonitorEndpoint
- */
- leaderContender.grantLeadership(issuedLeaderSessionID);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Ignoring the grant leadership notification since the {} has "
- + "already been closed.",
- leaderElectionDriver);
- }
- }
- }
- }
再进入leaderContender.grantLeadership方法,选择ResourceManager的实现:
- // ------------------------------------------------------------------------
- // Leader Contender
- // ------------------------------------------------------------------------
-
- /**
- * Callback method when current resourceManager is granted leadership.
- *
- * @param newLeaderSessionID unique leadershipID
- */
- @Override
- public void grantLeadership(final UUID newLeaderSessionID) {
- final CompletableFuture
acceptLeadershipFuture = - clearStateFuture.thenComposeAsync(
- // TODO 选举成功后执行回调函数
- (ignored) -> tryAcceptLeadership(newLeaderSessionID),
- getUnfencedMainThreadExecutor());
-
- final CompletableFuture
confirmationFuture = - acceptLeadershipFuture.thenAcceptAsync(
- (acceptLeadership) -> {
- if (acceptLeadership) {
- // confirming the leader session ID might be blocking,
- leaderElectionService.confirmLeadership(
- newLeaderSessionID, getAddress());
- }
- },
- ioExecutor);
-
- confirmationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
- if (throwable != null) {
- onFatalError(ExceptionUtils.stripCompletionException(throwable));
- }
- });
- }
在这个方法内部,ResourceManager调用了一个tryAcceptLeadership()方法,我们进入这个方法
- private CompletableFuture
tryAcceptLeadership(final UUID newLeaderSessionID) { - if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
- final ResourceManagerId newResourceManagerId =
- ResourceManagerId.fromUuid(newLeaderSessionID);
-
- log.info(
- "ResourceManager {} was granted leadership with fencing token {}",
- getAddress(),
- newResourceManagerId);
-
- // clear the state if we've been the leader before
- if (getFencingToken() != null) {
- clearStateInternal();
- }
-
- setFencingToken(newResourceManagerId);
-
- /*
- TODO 启动服务
- 1.启动两个心跳服务
- 2.启动slotManager服务启动两个定时任务
- */
- startServicesOnLeadership();
-
- return prepareLeadershipAsync().thenApply(ignored -> hasLeadership = true);
- } else {
- return CompletableFuture.completedFuture(false);
- }
- }
在这里通过调用startServicesOnLeadership方法,启动了两个心跳服务和两个定时任务,我们进入这个方法一探究竟:
-
- private void startServicesOnLeadership() {
- // TODO 启动两个心跳服务
- startHeartbeatServices();
-
- // TODO 启动两个定时服务
- // TODO SlotManager是存在于ResourceManager中用来管理所有TaskManager汇报和注册的Slot的工作的
- slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
-
- onLeadership();
- }
我们先来看启动的两个心跳服务,进入startHeartbeatServices()方法:
- private void startHeartbeatServices() {
- // TODO ResourceManager(主节点)维持和从节点的心跳
- // TODO ResourceManager(逻辑JobManager)维持和TaskExecutor(TaskManager)的心跳
- taskManagerHeartbeatManager =
- heartbeatServices.createHeartbeatManagerSender(
- resourceId,
- new TaskManagerHeartbeatListener(),
- getMainThreadExecutor(),
- log);
-
- // TODO ResourceManager维持和JobMaster(主控程序)的心跳
- jobManagerHeartbeatManager =
- heartbeatServices.createHeartbeatManagerSender(
- resourceId,
- new JobManagerHeartbeatListener(),
- getMainThreadExecutor(),
- log);
- }
可以看到到这两个心跳分别为:
1、ResourceManager维持和TaskManager的心跳
2、ResourceManager维持和主控程序(JobMaster)的心跳
我们再来看这个心跳是如何实现的,我们进入createHeartbeatManagerSender方法,在进入HeartbeatManagerSenderImpl,再进入this,可以看到:
- HeartbeatManagerSenderImpl(
- long heartbeatPeriod,
- long heartbeatTimeout,
- ResourceID ownResourceID,
- HeartbeatListener heartbeatListener,
- ScheduledExecutor mainThreadExecutor,
- Logger log,
- HeartbeatMonitor.Factory
heartbeatMonitorFactory) { - super(
- heartbeatTimeout,
- ownResourceID,
- heartbeatListener,
- mainThreadExecutor,
- log,
- heartbeatMonitorFactory);
-
- this.heartbeatPeriod = heartbeatPeriod;
- // TODO 线程池定时调用this的run方法,由于delay为0L,立即执行
- mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
- }
在这里构建了一个延时线程池,不过延迟时间为0,则当代码执行到这里时会立即调用this的run方法,我们去看run方法:
- @Override
- public void run() {
- if (!stopped) {
- log.debug("Trigger heartbeat request.");
- // 详细说明待后面解析完从节点后在介绍
- for (HeartbeatMonitor
heartbeatMonitor : getHeartbeatTargets().values()) { - // TODO 发送心跳
- requestHeartbeat(heartbeatMonitor);
- }
-
- //等heartbeatPeriod=10s之后,再次执行this的run方法,来控制上面的for循环每隔10s执行一次,实现心跳的无限循环
- getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
- }
- }
可以看到,当代码第一次执行到这里是,会先调用依次发送心跳的方法,关于这个for循环,我们会在后面的章节里详细解析,这里就先不细说。在发送完心跳之后又出现了一个延时线程池,在heartbeatPeriod(10秒)延时后,会再次触发this的run方法,也就是当前方法,到此就会进入无限的心跳循环,也是在这里构建完成无限心跳。
分析完心跳的实现,我们回去看那两个定时服务是什么:
首先进入slotManager.start方法并选择SlotManagerImpl实现:
- @Override
- public void start(
- ResourceManagerId newResourceManagerId,
- Executor newMainThreadExecutor,
- ResourceActions newResourceActions) {
- LOG.info("Starting the SlotManager.");
-
- this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
- mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
- resourceActions = Preconditions.checkNotNull(newResourceActions);
-
- started = true;
-
- /*
- TODO 定时任务checkTaskManagerTimeoutsAndRedundancy
- 每隔30秒检查一次闲置的TaskManager
- */
- taskManagerTimeoutsAndRedundancyCheck =
- scheduledExecutor.scheduleWithFixedDelay(
- () ->
- mainThreadExecutor.execute(
- () -> checkTaskManagerTimeoutsAndRedundancy()),
- 0L,
- taskManagerTimeout.toMilliseconds(),
- TimeUnit.MILLISECONDS);
-
- /*
- TODO 定时任务 checkSlotRequestTimeouts
- Slot在申请中是状态为PendingRequest, 这个定时任务就是来检测那些已经超过5分钟的pendingRequest
- 也就是超时的Slot
- */
- slotRequestTimeoutCheck =
- scheduledExecutor.scheduleWithFixedDelay(
- () -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
- 0L,
- slotRequestTimeout.toMilliseconds(),
- TimeUnit.MILLISECONDS);
-
- registerSlotManagerMetrics();
- }
可以看到,这两个定时任务分别为:
1、闲置TaskManager的定时检查,这里当我们是yarn-session模式时,会定时(30秒)检查一次闲置的TaskManager,当有闲置时间超过30秒的Taskmanager是,回去将该从节点回收,并释放资源。
2、定时检查超时的Slot申请,Slot在申请中是状态为PendingRequest, 这个定时任务就是来检测那些已经超过5分钟的pendingRequest 也就是超时的Slot
我们进入超时slot的检查方法checkSlotRequestTimeouts:
- private void checkSlotRequestTimeouts() {
- if (!pendingSlotRequests.isEmpty()) {
- long currentTime = System.currentTimeMillis();
-
- Iterator
> slotRequestIterator = - pendingSlotRequests.entrySet().iterator();
-
- // TODO 遍历SlotRequest列表
- while (slotRequestIterator.hasNext()) {
- PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
-
- // 判断已超时的slotRequest
- if (currentTime - slotRequest.getCreationTimestamp()
- >= slotRequestTimeout.toMilliseconds()) {
- // 移除掉已超时的slotRequest
- slotRequestIterator.remove();
-
- // TODO ResourceManager已经分配给某个Job的Slot,但是该Slot还处于pendingRequest状态
- if (slotRequest.isAssigned()) {
- // 取消
- cancelPendingSlotRequest(slotRequest);
- }
-
- // TODO 通知失败
- resourceActions.notifyAllocationFailure(
- slotRequest.getJobId(),
- slotRequest.getAllocationId(),
- new TimeoutException("The allocation could not be fulfilled in time."));
- }
- }
- }
- }
检查流程为:
1、遍历所有slot请求列表
2、判断已超时的slotRequest
3、移除超时SlotRequest
4、 如果ResourceManager已经分配给某个Job的Slot,但是该Slot还处于pendingRequest状态
5、则先取消当前的slot分配
6、再通知该slot分配失败
到此为止,ResourceManager就已经启动完毕,最后我们总结一下ResourceManager的启动工作:
ResourceManager的启动要点有以下几点:
1、ResourceManager是一个RpcEndpoint(Actor),当构建好对象后启动时会触发onStart(Actor的perStart生命周期方法)方法
2、ResourceManager也是一个LeaderContendr,也会执行竞选, 会执行竞选结果方法
3、ResourceManagerService 具有两个心跳服务和两个定时服务:
a、两个心跳服务:
ⅰ、从节点和主节点之间的心跳
ⅱ、Job的主控程序和主节点之间的心跳
b、两个定时服务:
ⅰ、TaskManager 的超时检查服务
ⅱ、Slot申请的 超时检查服务
在下一章中,我们继续介绍Dispatcher的构建和启动过程!