书接上回,【Flink源码】再谈 Flink 程序提交流程(上) 一文中我们已经将程序从客户端提交给了 ResourceManager
接下来我们就去 ResourceManager 中一探究竟
YarnJobClusterEntrypoint 类是 Yarn per-job 集群的入口,包含了我们想看的 main 方法
YarnJobClusterEntrypoint.java
public static void main(String[] args) {
LOG.warn(
"Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
Map<String, String> env = System.getenv();
final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
Preconditions.checkArgument(
workingDirectory != null,
"Working directory variable (%s) not set",
ApplicationConstants.Environment.PWD.key());
try {
YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
} catch (IOException e) {
LOG.warn("Could not log YARN environment information.", e);
}
final Configuration dynamicParameters =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new DynamicParametersConfigurationParserFactory(),
YarnJobClusterEntrypoint.class);
final Configuration configuration =
YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
new YarnJobClusterEntrypoint(configuration);
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}
main 方法中通过 ClusterEntrypoint.runClusterEntrypoint 方法以 YarnJobClusterEntrypoint 对象为参数加载运行入口
ClusterEntrypoint.java
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
int returnCode;
Throwable throwable = null;
try {
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
} catch (Throwable e) {
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
}
LOG.info(
"Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
}
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
FlinkSecurityManager.setFromConfiguration(configuration);
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
configureFileSystems(configuration, pluginManager);
SecurityContext securityContext = installSecurityContext(configuration);
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
securityContext.runSecured(
(Callable<Void>)
() -> {
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(strippedThrowable),
false)
.get(
INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format(
"Failed to initialize the cluster entrypoint %s.",
getClass().getSimpleName()),
strippedThrowable);
}
}
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// 创建 dispatcher、ResourceManager 对象的工厂类
// 其中有从本地重新构建 JobGraph 的过程
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 通过工厂类创建 dispatcher、ResourceManager 对象
// Entry 启动 RpcService、HAService、BlobServer、HeartbeatService、MetricRegistry、ExecutionGraphStore 等
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
}
});
}
}
我们继续看创建过程
找到 DispatcherResourceManagerComponentFactory 接口的实现类 DefaultDispatcherResourceManagerComponentFactory
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler)
throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManagerService resourceManagerService = null;
DispatcherRunner dispatcherRunner = null;
try {
dispatcherLeaderRetrievalService =
highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建接收前端 Rest 请求的节点
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
final String hostname = RpcUtils.getHostname(rpcService);
// 创建 ResourceManager 对象,返回的是 new YarnResourceManager
// 调度过程:AbstractDispatcherResourceManagerComponentFactory
// -> ActiveResourceManagerFactory
// -> YarnResourceManagerFactory
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
delegationTokenManager,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
dispatcherOperationCaches);
// 创建 dispatcherRunner 对象并启动
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
// 启动 ResourceManager
log.debug("Starting ResourceManagerService.");
resourceManagerService.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManagerService,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler,
dispatcherOperationCaches);
} catch (Exception exception) {
// clean up all started components
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManagerService != null) {
terminationFutures.add(resourceManagerService.closeAsync());
}
if (dispatcherRunner != null) {
terminationFutures.add(dispatcherRunner.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture =
FutureUtils.completeAll(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
throw new FlinkException(
"Could not create the DispatcherResourceManagerComponent.", exception);
}
}
至此,我们找到了 dispatcher、ResouceManager 创建和启动方法,接下来我们有必要深入看看具体的过程
首先我们看 YarnResourceManager 创建过程
ResourceManagerFactory.java
public ResourceManager<T> createResourceManager(
ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {
final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
createResourceManagerRuntimeServices(
context.getRmRuntimeServicesConfig(),
context.getRpcService(),
context.getHighAvailabilityServices(),
SlotManagerMetricGroup.create(
context.getMetricRegistry(), context.getHostname()));
return createResourceManager(
context.getRmConfig(),
context.getResourceId(),
context.getRpcService(),
leaderSessionId,
context.getHeartbeatServices(),
context.getDelegationTokenManager(),
context.getFatalErrorHandler(),
context.getClusterInformation(),
context.getWebInterfaceUrl(),
ResourceManagerMetricGroup.create(
context.getMetricRegistry(), context.getHostname()),
resourceManagerRuntimeServices,
context.getIoExecutor());
}
这里的 createResourceManager 是一个抽象方法,我们找到 ResourceManagerFactory 的 Yarn 实现类 YarnResourceManagerFactory
YarnResourceManagerFactory.java
public ResourceManager<YarnWorkerNode> createResourceManager( Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
return new YarnResourceManager( rpcService,
resourceId,
configuration,
System.getenv(),
highAvailabilityServices,
heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation,
fatalErrorHandler, webInterfaceUrl, resourceManagerMetricGroup);
}
创建 YarnResourceManager 时,创建了 SlotManager
我们再继续看一下 SlotManager 是如何创建的
ResourceManagerFactory.java
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManagerMetricGroup slotManagerMetricGroup) {
return ResourceManagerRuntimeServices.fromConfiguration(
rmRuntimeServicesConfig,
highAvailabilityServices,
rpcService.getScheduledExecutor(),
slotManagerMetricGroup);
}
ResourceManagerRuntimeServices.java
public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
ScheduledExecutor scheduledExecutor,
SlotManagerMetricGroup slotManagerMetricGroup) {
final SlotManager slotManager =
createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);
final JobLeaderIdService jobLeaderIdService =
new DefaultJobLeaderIdService(
highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}
到这里,我们找到了创建 YarnResouceManager 的方法 createSlotManager
接下来我们看 Dispatcher 的创建和启动过程
找到接口 DispatcherRunnerFactory 的实现类 DefaultDispatcherRunnerFactory
DefaultDispatcherRunnerFactory.java
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
dispatcherLeaderProcessFactoryFactory.createFactory(
jobPersistenceComponentFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
return DefaultDispatcherRunner.create(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
}
再看 DefaultDispatcherRunner 类
DefaultDispatcherRunner.java
public static DispatcherRunner create(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
throws Exception {
final DefaultDispatcherRunner dispatcherRunner =
new DefaultDispatcherRunner(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
dispatcherRunner, leaderElectionService);
}
DispatcherRunnerLeaderElectionLifecycleManager.java
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
return new DispatcherRunnerLeaderElectionLifecycleManager<>(
dispatcherRunner, leaderElectionService);
}
private DispatcherRunnerLeaderElectionLifecycleManager(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
// 启动 dispatcher 的 leader 选举
leaderElectionService.start(dispatcherRunner);
}
找到 LeaderElectionService 接口的实现类 StandaloneLeaderElectionService
StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
DefaultDispatcherRunner.java
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
stopDispatcherLeaderProcess();
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
newDispatcherLeaderProcess::start));
}
AbstractDispatcherLeaderProcess.java
public final void start() {
runIfStateIs(State.CREATED, this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
onStart();
}
再往下找实现了 onStart 方法的实现类
JobDispatcherLeaderProcess.java
protected void onStart() {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
CollectionUtil.ofNullable(jobGraph),
CollectionUtil.ofNullable(recoveredDirtyJobResult),
ThrowingJobGraphWriter.INSTANCE,
jobResultStore);
completeDispatcherSetup(dispatcherService);
}
DefaultDispatcherGatewayServiceFactory.java
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
// 启动 dispatcher
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
至此,dispatcher 启动完毕
下面我们来看 ResourceManager 启动过程
ResourceManager.java
public final void onStart() throws Exception {
try {
log.info("Starting the resource manager.");
startResourceManagerServices();
startedFuture.complete(null);
} catch (Throwable t) {
final ResourceManagerException exception =
new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerMetrics();
startHeartbeatServices();
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
new ResourceActionsImpl(),
blocklistHandler::isBlockedTaskManager);
delegationTokenManager.start();
initialize();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
在 startResourceManagerServices 方法中,包含了初始化、心跳开启、slotManager 开启等操作
到这里,我们总算探究完成了 dispatcher 和 ResourceManager 的创建和启动过程
现在我们回到最开始,继续看 Flink 程序提交流程的下一个步骤
在启动了 dispatcher 和 ResourceManager 后,Dispatcher 启动了 JobManager
要一探究竟首先我们先进入 dispatcher 的实现类 Dispatcher
Dispatcher.java
public void onStart() throws Exception {
try {
// 启动 Dispatcher
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception =
new DispatcherException(
String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startCleanupRetries();
// 启动 Job
startRecoveredJobs();
this.dispatcherBootstrap =
this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
}
private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
}
recoveredJobs.clear();
}
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
try {
runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(
new DispatcherException(
String.format(
"Could not start recovered job %s.", recoveredJob.getJobID()),
throwable));
}
}
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
... ...
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
... ...
}
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long
initializationTimestamp) {
final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync(
() -> {
try {
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner( jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler,
initializationTimestamp);
// 启动 JobManagerRunner
runner.start();
return runner;
}
... ...
}
JobManagerRunnerImpl.java
public void start() throws Exception {
try {
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could not start the JobManager because the leader election service did not start.", e);
throw new Exception("Could not start the leader election service.", e);
}
}
StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception {
... ...
contender.grantLeadership(HighAvaliabilityServices.DEFAULT_LEADER_ID);
}
JobManagerRunnerImpl.java
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.debug("JobManagerRunner cannot be granted leadership because it is already shut
down.");
return;
}
leadershipOperation = leadershipOperation.thenCompose( (ignored) -> {
synchronized (lock) {
// 校验作业的调度状态然后启动作业管理器
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture =
getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
... ...
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
... ...
}
JobMaster.java
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
private void startJobExecution() throws Exception {
validateRunsInMainThread();
JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
shuffleMaster.registerJob(context);
// 启动 JobMaster
startJobMasterServices();
log.info(
"Starting execution of job '{}' ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
getFencingToken());
// 开始调度
startScheduling();
}
最终,由 Dispatcher 类经过层层调用找到 JobMaster 类调用了其启动方法。
在创建了 ResourceManager 和 Dispatcher 之后,Dispatcher 启动了 JobManager,而 ResourceManager 则启动了 SlotManager
下面我们就具体来看这一过程
故事还要从 ResouceManager 类的 onStart 方法说起
ResourceManager.java
public final void onStart() throws Exception {
try {
log.info("Starting the resource manager.");
startResourceManagerServices();
startedFuture.complete(null);
} catch (Throwable t) {
final ResourceManagerException exception =
new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
// 开启 ResourceManager 服务
private void startResourceManagerServices() throws Exception {
try {
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerMetrics();
startHeartbeatServices();
// 开启 SlotManager
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
new ResourceActionsImpl(),
blocklistHandler::isBlockedTaskManager);
delegationTokenManager.start();
// 初始化 ResourceManager
initialize();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
由源码可知,该过程分为了两个重要步骤:开启 SlotManager 和初始化 ResourceManager,即创建 Yarn 的 ResourceManager 和 NodeManager 客户端
start 为 SlotManager 接口的方法,找到该接口的实现类 FineGrainedSlotManager,该类中的 start 方法根据给定的 leader id 和 ResourceManager 行为来实现开启 SlotManager
FineGrainedSlotManager.java
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceActions newResourceActions,
BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
LOG.info("Starting the slot manager.");
resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
// slot 状态同步
slotStatusSyncer.initialize(
taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
started = true;
// TaskManager 超时检查
taskManagerTimeoutsCheck =
scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
}
步骤已经很明显了
在创建了 JobManager 和 SlotManager 之后,下一步 JobManager 申请了 slot
在 JobMaster 启动之时,同时启动了 SlotPool,向 ResourceManager 注册
JobMaster.java
private void startJobMasterServices() throws Exception {
try {
// 启动 TaskManager 心跳服务
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
// 启动 ResourceManager 心跳服务
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
// 启动 slotPool
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
// 启动后 slot pool 开始向 slot manager 请求 slot
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
经过下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManagerLeader()
-> tryConnectToResourceManager()
-> connectToResourceManager()
private void connectToResourceManager() {
... ...
resourceManagerConnection = new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService
);
resourceManagerConnection.start();
}
RegisteredRpcConnection.java
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(
!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
private RetryingRegistration<F, G, S, R> createNewRegistration() {
RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());
CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
newRegistration.getFuture();
future.whenCompleteAsync(
(RetryingRegistration.RetryingRegistrationResult<G, S, R> result,
Throwable failure) -> {
if (failure != null) {
if (failure instanceof CancellationException) {
// we ignore cancellation exceptions because they originate from
// cancelling
// the RetryingRegistration
log.debug(
"Retrying registration towards {} was cancelled.",
targetAddress);
} else {
// this future should only ever fail if there is a bug, not if the
// registration is declined
onRegistrationFailure(failure);
}
} else {
if (result.isSuccess()) {
targetGateway = result.getGateway();
onRegistrationSuccess(result.getSuccess());
} else if (result.isRejection()) {
onRegistrationRejection(result.getRejection());
} else {
throw new IllegalArgumentException(
String.format(
"Unknown retrying registration response: %s.", result));
}
}
},
executor);
return newRegistration;
}
TaskExecutorToResourceManagerConnection.java
protected RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
TaskExecutorRegistrationSuccess,
TaskExecutorRegistrationRejection>
generateRegistration() {
return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
log,
rpcService,
getTargetAddress(),
getTargetLeaderId(),
retryingRegistrationConfiguration,
taskExecutorRegistration);
}
ResourceManagerRegistration(
Logger log,
RpcService rpcService,
String targetAddress,
ResourceManagerId resourceManagerId,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
TaskExecutorRegistration taskExecutorRegistration) {
super(
log,
rpcService,
"ResourceManager",
ResourceManagerGateway.class,
targetAddress,
resourceManagerId,
retryingRegistrationConfiguration);
this.taskExecutorRegistration = taskExecutorRegistration;
}
注册成功调用 onRegistrationSuccess(),向 ResourceManager 进行 slot 的申请
JobMaster.java 的内部类 ResourceManagerConnection
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
// verify the response with current connection
if (resourceManagerConnection != null
&& Objects.equals(
resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
log.info(
"JobManager successfully registered at ResourceManager, leader id: {}.",
resourceManagerId);
final ResourceManagerGateway resourceManagerGateway =
resourceManagerConnection.getTargetGateway();
final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
establishedResourceManagerConnection =
new EstablishedResourceManagerConnection(
resourceManagerGateway, resourceManagerResourceId);
blocklistHandler.registerBlocklistListener(resourceManagerGateway);
// 连接到 ResourceManager
slotPoolService.connectToResourceManager(resourceManagerGateway);
partitionTracker.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new ResourceManagerHeartbeatReceiver(resourceManagerGateway));
} else {
log.debug(
"Ignoring resource manager connection to {} because it's duplicated or outdated.",
resourceManagerId);
}
}
DeclarativeSlotPoolService.java
public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
// 向 ResourceManager 申请 slot
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); }
// all sent off
waitingForResourceManager.clear();
}
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) {
... ...
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
... ...
}
ResourceManager.java:由 ResourceManager 里的 SlotManager 处理请求
public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId,
SlotRequest slotRequest, final Time timeout) {
... ...
try {
// SlotManager 处理 slot 请求
slotManager.registerSlotRequest(slotRequest);
}
... ...
}
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
checkInit();
... ...
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
internalRequestSlot(pendingSlotRequest);
}
... ...
}
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws
ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent(() ->
fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
... ...
if(!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
... ...
}