• Flink 1.13 源码解析——JobManager接收RestClient提交的Flink Job


    本文相关内容:

    Flink 1.13 源码解析 目录汇总

    Flink 1.13 源码解析前导——Akka通信模型

    Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动

    Flink 1.13 源码解析——Flink 作业提交流程 上

    目录

    前言

    一、JobSubmitHandler解析JobGraph并交给Dispatcher

    二、Dispatcher接收JobGraph并初始化JobMaster并启动JobMaster

    2.1、初始化JobMaster所需的相关基础服务

    2.2、JobMaster的Leader竞选流程

    2.3、JobMaster的初始化和启动

    总结


    前言

            在上一章中我们讲到,在env.execute环节中,根据我们构建的Transformations集合,构建出StreamGraph,再将StreamGraph转化为JobGraph,并将JobGraph持久化,最终将我们的JobGraphFile以及依赖Jar以及其他一些配置构建为一个RequestBody,通过RestClient内部构建的Netty客户端发送至JobManager中的WebMonitorEndpoint中的Netty 服务端,再由Netty服务端解析url交给JobSubmitHandler处理。

            在这一章中,我们来分析一下JobManager接收到RestClient发送来的HttpRequest后的处理流程。

    一、JobSubmitHandler解析JobGraph并交给Dispatcher

            客户端构建好的JobGraph以及所需的资源会发送给WebMonitorEndpoint。在WebMonitorEndpoint内部有一个Router,用来解析url,并发送给url对应的handler,然后回调该handler的handleRequest方法,我们直接来看JobSubmitHandler的handleRequest方法:

    1. /*
    2. TODO 从磁盘文件反序列化得到JobGraph, 并转交给Dispatcher
    3. */
    4. @Override
    5. protected CompletableFuture handleRequest(
    6. @Nonnull HandlerRequest request,
    7. @Nonnull DispatcherGateway gateway)
    8. throws RestHandlerException {
    9. // TODO 从请求中获取文件: 包含JobGraph序列化文件nameToFile
    10. final Collection uploadedFiles = request.getUploadedFiles();
    11. final Map nameToFile =
    12. uploadedFiles.stream()
    13. .collect(Collectors.toMap(File::getName, Path::fromLocalFile));
    14. if (uploadedFiles.size() != nameToFile.size()) {
    15. throw new RestHandlerException(
    16. String.format(
    17. "The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
    18. uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
    19. nameToFile.size(),
    20. uploadedFiles.size()),
    21. HttpResponseStatus.BAD_REQUEST);
    22. }
    23. // TODO 拿到请求体
    24. final JobSubmitRequestBody requestBody = request.getRequestBody();
    25. if (requestBody.jobGraphFileName == null) {
    26. throw new RestHandlerException(
    27. String.format(
    28. "The %s field must not be omitted or be null.",
    29. JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
    30. HttpResponseStatus.BAD_REQUEST);
    31. }
    32. // TODO 反序列化得到JobGraph
    33. // TODO 由此可见,服务端接收到客户端提交的,其实就是一个JobGraph
    34. CompletableFuture jobGraphFuture = loadJobGraph(requestBody, nameToFile);
    35. // TODO 获取Job本体jar
    36. Collection jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
    37. // TODO 获取job的依赖Jar
    38. Collection> artifacts =
    39. getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
    40. // TODO 将JobGraph + 程序Jar + 依赖Jar 上传至BlobServer
    41. CompletableFuture finalizedJobGraphFuture =
    42. uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
    43. // TODO 转交给Dispatcher
    44. CompletableFuture jobSubmissionFuture =
    45. finalizedJobGraphFuture.thenCompose(
    46. // TODO 由JobSubmitHandler转交给Dispatcher来执行处理
    47. // TODO 此处的Gateway为Dispatcher的代理对象
    48. jobGraph -> gateway.submitJob(jobGraph, timeout));
    49. return jobSubmissionFuture.thenCombine(
    50. jobGraphFuture,
    51. (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    52. }

    在这个方法里做了以下工作:

    1、从请求中获取文件: 包含JobGraph序列化文件nameToFile。

    2、从请求中取出请求体

    3、从请求体中取出JobGraph

    4、从请求体中取出job本身的Jar

    5、从请求体中拿到Job的依赖Jar

    6、将JobGraph、Job本身Jar、Job依赖Jar上传至BlobServer

    7、将JobGraph交给Dispatcher组件

    我们先来看JobGraph的解析过程,点开loadJobGraph方法:

    1. private CompletableFuture loadJobGraph(
    2. JobSubmitRequestBody requestBody, Map nameToFile)
    3. throws MissingFileException {
    4. final Path jobGraphFile =
    5. getPathAndAssertUpload(
    6. requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile);
    7. // TODO 从文件中反序列化JobGraph
    8. return CompletableFuture.supplyAsync(
    9. () -> {
    10. JobGraph jobGraph;
    11. try (ObjectInputStream objectIn =
    12. new ObjectInputStream(
    13. jobGraphFile.getFileSystem().open(jobGraphFile))) {
    14. jobGraph = (JobGraph) objectIn.readObject();
    15. } catch (Exception e) {
    16. throw new CompletionException(
    17. new RestHandlerException(
    18. "Failed to deserialize JobGraph.",
    19. HttpResponseStatus.BAD_REQUEST,
    20. e));
    21. }
    22. return jobGraph;
    23. },
    24. executor);
    25. }

    可以看到,是从文件系统中拿到JobGraphFile,并进行反序列化得到JobGraph。

    我们再来看将JobGraph + 程序Jar + 依赖Jar 上传至BlobServer的过程,点开uploadJobGraphFiles方法:

    1. private CompletableFuture uploadJobGraphFiles(
    2. DispatcherGateway gateway,
    3. CompletableFuture jobGraphFuture,
    4. Collection jarFiles,
    5. Collection> artifacts,
    6. Configuration configuration) {
    7. CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout);
    8. return jobGraphFuture.thenCombine(
    9. blobServerPortFuture,
    10. (JobGraph jobGraph, Integer blobServerPort) -> {
    11. final InetSocketAddress address =
    12. new InetSocketAddress(gateway.getHostname(), blobServerPort);
    13. try {
    14. // TODO BIO通信,BlobClient => BlobServer
    15. ClientUtils.uploadJobGraphFiles(
    16. jobGraph,
    17. jarFiles,
    18. artifacts,
    19. () -> new BlobClient(address, configuration));
    20. } catch (FlinkException e) {
    21. throw new CompletionException(
    22. new RestHandlerException(
    23. "Could not upload job files.",
    24. HttpResponseStatus.INTERNAL_SERVER_ERROR,
    25. e));
    26. }
    27. return jobGraph;
    28. });
    29. }

    上传JobGraph相关的资源文件,这里是通过BlobClient进行上传,上传到BlobServer,在JobManager启动时我们还讲过BlobServer会有一个1小时的定时任务,会定时清理用不到的资源文件。

    二、Dispatcher接收JobGraph并初始化JobMaster并启动JobMaster

            在转交JobGraph给Dispatcher时,是通过调用Dispatcher的代理对象方法实现的,我们点进gateway.submitJob方法,选择Dispatcher实现:

    1. @Override
    2. public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) {
    3. log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    4. try {
    5. // TODO jobID的去重判断
    6. if (isDuplicateJob(jobGraph.getJobID())) {
    7. return FutureUtils.completedExceptionally(
    8. new DuplicateJobSubmissionException(jobGraph.getJobID()));
    9. } else if (isPartialResourceConfigured(jobGraph)) {
    10. return FutureUtils.completedExceptionally(
    11. new JobSubmissionException(
    12. jobGraph.getJobID(),
    13. "Currently jobs is not supported if parts of the vertices have "
    14. + "resources configured. The limitation will be removed in future versions."));
    15. } else {
    16. // TODO 提交Job,此时JobGraph所需的jar和文件都已经上传
    17. // TODO 此处携带的JobGraph,会在一会启动JobMaster的时候,会用来构建ExecutionGraph
    18. return internalSubmitJob(jobGraph);
    19. }
    20. } catch (FlinkException e) {
    21. return FutureUtils.completedExceptionally(e);
    22. }
    23. }

    代码执行到这里时,JobGraph所需的Jar和其他资源文件已上传至BlobServer服务器,我们继续点进internalSubmitJob(jobGraph):

    1. private CompletableFuture internalSubmitJob(JobGraph jobGraph) {
    2. log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    3. final CompletableFuture persistAndRunFuture =
    4. // TODO 先持久化,然后运行(拉起JobMaster),this::persistAndRunJob
    5. waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
    6. .thenApply(ignored -> Acknowledge.get());
    7. return persistAndRunFuture.handleAsync(
    8. (acknowledge, throwable) -> {
    9. if (throwable != null) {
    10. cleanUpJobData(jobGraph.getJobID(), true);
    11. ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
    12. final Throwable strippedThrowable =
    13. ExceptionUtils.stripCompletionException(throwable);
    14. log.error(
    15. "Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
    16. throw new CompletionException(
    17. new JobSubmissionException(
    18. jobGraph.getJobID(),
    19. "Failed to submit job.",
    20. strippedThrowable));
    21. } else {
    22. return acknowledge;
    23. }
    24. },
    25. ioExecutor);
    26. }

    我们继续点进this::persistAndRunJob方法:

    1. private void persistAndRunJob(JobGraph jobGraph) throws Exception {
    2. // TODO 服务端保存JobGraph此处是将JobGraph持久化到FileSystem(例如hdfs)上,返回一个stateHandle(句柄),并将状态句柄保存在zk里面
    3. // TODO 之前在讲主节点启动时Dispatcher会启动一个JobGraphStore服务,并且如果里面还有未执行完的JobGraph,会先进行恢复
    4. // TODO JobGraphWriter = DefaultJobGraphStore
    5. jobGraphWriter.putJobGraph(jobGraph);
    6. // TODO
    7. runJob(jobGraph, ExecutionType.SUBMISSION);
    8. }

    之前在讲主节点启动时Dispatcher会启动一个JobGraphStore服务,并且如果里面还有未执行完的JobGraph,会先进行恢复。这里的JobGraphWriter就是JobGraphStore,我们点进jobGraphWriter.putJobGraph(jobGraph)方法,选择DefaultJobGraphStore实现:

    1. @Override
    2. public void putJobGraph(JobGraph jobGraph) throws Exception {
    3. checkNotNull(jobGraph, "Job graph");
    4. final JobID jobID = jobGraph.getJobID();
    5. final String name = jobGraphStoreUtil.jobIDToName(jobID);
    6. LOG.debug("Adding job graph {} to {}.", jobID, jobGraphStateHandleStore);
    7. boolean success = false;
    8. while (!success) {
    9. synchronized (lock) {
    10. verifyIsRunning();
    11. final R currentVersion = jobGraphStateHandleStore.exists(name);
    12. if (!currentVersion.isExisting()) {
    13. try {
    14. // TODO
    15. jobGraphStateHandleStore.addAndLock(name, jobGraph);
    16. addedJobGraphs.add(jobID);
    17. success = true;
    18. } catch (StateHandleStore.AlreadyExistException ignored) {
    19. LOG.warn("{} already exists in {}.", jobGraph, jobGraphStateHandleStore);
    20. }
    21. } else if (addedJobGraphs.contains(jobID)) {
    22. try {
    23. jobGraphStateHandleStore.replace(name, currentVersion, jobGraph);
    24. LOG.info("Updated {} in {}.", jobGraph, getClass().getSimpleName());
    25. success = true;
    26. } catch (StateHandleStore.NotExistException ignored) {
    27. LOG.warn("{} does not exists in {}.", jobGraph, jobGraphStateHandleStore);
    28. }
    29. } else {
    30. throw new IllegalStateException(
    31. "Trying to update a graph you didn't "
    32. + "#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
    33. }
    34. }
    35. }
    36. LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
    37. }

    在这段代码里,获取了一些Job的相关信息,并确认Job的运行状态,我们点进jobGraphStateHandleStore.addAndLock方法,选择zookeeper的实现:

    1. @Override
    2. public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state)
    3. throws PossibleInconsistentStateException, Exception {
    4. checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    5. checkNotNull(state, "State");
    6. final String path = normalizePath(pathInZooKeeper);
    7. if (exists(path).isExisting()) {
    8. throw new AlreadyExistException(
    9. String.format("ZooKeeper node %s already exists.", path));
    10. }
    11. // TODO 保存在fileSystem上,并返回一个状态句柄
    12. final RetrievableStateHandle storeHandle = storage.store(state);
    13. // TODO 先序列化该状态句柄,转为字节序列化数据
    14. final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
    15. try {
    16. // TODO 存储在zk上
    17. writeStoreHandleTransactionally(path, serializedStoreHandle);
    18. return storeHandle;
    19. } catch (KeeperException.NodeExistsException e) {
    20. // Transactions are not idempotent in the curator version we're currently using, so it
    21. // is actually possible that we've re-tried a transaction that has already succeeded.
    22. // We've ensured that the node hasn't been present prior executing the transaction, so
    23. // we can assume that this is a result of the retry mechanism.
    24. return storeHandle;
    25. } catch (Exception e) {
    26. if (indicatesPossiblyInconsistentState(e)) {
    27. throw new PossibleInconsistentStateException(e);
    28. }
    29. // In case of any other failure, discard the state and rethrow the exception.
    30. storeHandle.discardState();
    31. throw e;
    32. }
    33. }

    可以看到,这里将JobGraph先持久化到外部存储系统,例如hdfs,再获句柄,再将句柄保存在zookeeper上,这里将句柄保存在zk上是处于性能效率考虑。

    在完成了JobGraph的持久化后,将开始执行Job,我们回到这段代码:

    1. private void persistAndRunJob(JobGraph jobGraph) throws Exception {
    2. // TODO 服务端保存JobGraph此处是将JobGraph持久化到FileSystem(例如hdfs)上,返回一个stateHandle(句柄),并将状态句柄保存在zk里面
    3. // TODO 之前在讲主节点启动时Dispatcher会启动一个JobGraphStore服务,并且如果里面还有未执行完的JobGraph,会先进行恢复
    4. // TODO JobGraphWriter = DefaultJobGraphStore
    5. jobGraphWriter.putJobGraph(jobGraph);
    6. // TODO
    7. runJob(jobGraph, ExecutionType.SUBMISSION);
    8. }

    点进runJob方法:

    1. private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception {
    2. Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
    3. long initializationTimestamp = System.currentTimeMillis();
    4. /*
    5. TODO 创建JobManagerRunner,这是一个启动器,内部会初始化DefaultJobMasterServiceProcessFactory对象
    6. 在JobMaster竞选完成后,DefaultJobMasterServiceProcessFactory对象会做两件重要的事情:
    7. 1. 创建JobMaster实例
    8. 2. 在创建JobMaster的时候,同时会把JobGraph变成ExecutionGraph
    9. TODO Flink集群的两个主从架构:
    10. 1. 资源管理 ResourceManager + TaskExecutor
    11. 2. 任务运行 JobMaster + StreamTask
    12. */
    13. JobManagerRunner jobManagerRunner =
    14. createJobManagerRunner(jobGraph, initializationTimestamp);
    15. // TODO 加入 runningJobs 队列
    16. runningJobs.put(jobGraph.getJobID(), jobManagerRunner);
    17. final JobID jobId = jobGraph.getJobID();
    18. final CompletableFuture cleanupJobStateFuture =
    19. jobManagerRunner
    20. .getResultFuture()
    21. .handleAsync(
    22. (jobManagerRunnerResult, throwable) -> {
    23. Preconditions.checkState(
    24. runningJobs.get(jobId) == jobManagerRunner,
    25. "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");
    26. if (jobManagerRunnerResult != null) {
    27. return handleJobManagerRunnerResult(
    28. jobManagerRunnerResult, executionType);
    29. } else {
    30. return jobManagerRunnerFailed(jobId, throwable);
    31. }
    32. },
    33. getMainThreadExecutor());
    34. final CompletableFuture jobTerminationFuture =
    35. cleanupJobStateFuture
    36. .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
    37. .thenCompose(Function.identity());
    38. FutureUtils.assertNoException(jobTerminationFuture);
    39. registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);
    40. }

    在这段代码里,首先构建了一个JobManagerRunner这么一个启动器,但是这个JobManager并不是我们所说的主节点JobManager,我们点进createJobManagerRunner方法:

    1. JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp)
    2. throws Exception {
    3. final RpcService rpcService = getRpcService();
    4. // TODO 构建JobManagerRunner,内部分装了一个DefaultJobMasterServiceProcessFactory,
    5. // 此对象内部会在后面leader竞选完成后构建JobMaster并启动
    6. JobManagerRunner runner =
    7. jobManagerRunnerFactory.createJobManagerRunner(
    8. jobGraph,
    9. configuration,
    10. rpcService,
    11. highAvailabilityServices,
    12. heartbeatServices,
    13. jobManagerSharedServices,
    14. new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
    15. fatalErrorHandler,
    16. initializationTimestamp);
    17. // TODO 开始JobMaster的选举,选举成功后会在ZooKeeperLeaderElectionDriver的isLeader方法中创建并启动JobMaster
    18. runner.start();
    19. return runner;
    20. }

    可以看到,这里通过工厂方法构建了一个JobManagerRunner,并启动了这个runner。

    2.1、初始化JobMaster所需的相关基础服务

    我们点jobManagerRunnerFactory.createJobManagerRunner:

    1. @Override
    2. public JobManagerRunner createJobManagerRunner(
    3. JobGraph jobGraph,
    4. Configuration configuration,
    5. RpcService rpcService,
    6. HighAvailabilityServices highAvailabilityServices,
    7. HeartbeatServices heartbeatServices,
    8. JobManagerSharedServices jobManagerServices,
    9. JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
    10. FatalErrorHandler fatalErrorHandler,
    11. long initializationTimestamp)
    12. throws Exception {
    13. checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
    14. final JobMasterConfiguration jobMasterConfiguration =
    15. JobMasterConfiguration.fromConfiguration(configuration);
    16. final RunningJobsRegistry runningJobsRegistry =
    17. highAvailabilityServices.getRunningJobsRegistry();
    18. // TODO 获取选举服务,准备进行JobMaster的leader选举
    19. final LeaderElectionService jobManagerLeaderElectionService =
    20. highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
    21. final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
    22. DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
    23. configuration, jobGraph.getJobType());
    24. if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
    25. == SchedulerExecutionMode.REACTIVE) {
    26. Preconditions.checkState(
    27. slotPoolServiceSchedulerFactory.getSchedulerType()
    28. == JobManagerOptions.SchedulerType.Adaptive,
    29. "Adaptive Scheduler is required for reactive mode");
    30. }
    31. final ShuffleMaster shuffleMaster =
    32. ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
    33. .createShuffleMaster(configuration);
    34. final LibraryCacheManager.ClassLoaderLease classLoaderLease =
    35. jobManagerServices
    36. .getLibraryCacheManager()
    37. .registerClassLoaderLease(jobGraph.getJobID());
    38. final ClassLoader userCodeClassLoader =
    39. classLoaderLease
    40. .getOrResolveClassLoader(
    41. jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths())
    42. .asClassLoader();
    43. // TODO 构建DefaultJobMasterServiceFactory,封装了JobMaster启动所需的基础服务
    44. final DefaultJobMasterServiceFactory jobMasterServiceFactory =
    45. new DefaultJobMasterServiceFactory(
    46. jobManagerServices.getScheduledExecutorService(),
    47. rpcService,
    48. jobMasterConfiguration,
    49. jobGraph,
    50. highAvailabilityServices,
    51. slotPoolServiceSchedulerFactory,
    52. jobManagerServices,
    53. heartbeatServices,
    54. jobManagerJobMetricGroupFactory,
    55. fatalErrorHandler,
    56. userCodeClassLoader,
    57. shuffleMaster,
    58. initializationTimestamp);
    59. final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =
    60. new DefaultJobMasterServiceProcessFactory(
    61. jobGraph.getJobID(),
    62. jobGraph.getName(),
    63. jobGraph.getCheckpointingSettings(),
    64. initializationTimestamp,
    65. jobMasterServiceFactory);
    66. return new JobMasterServiceLeadershipRunner(
    67. jobMasterServiceProcessFactory,
    68. jobManagerLeaderElectionService,
    69. runningJobsRegistry,
    70. classLoaderLease,
    71. fatalErrorHandler);
    72. }

    这段代码蛮长的,但是脉络很清晰,在里面初始化了一些基础JobMaster所需要的基础服务,例如JobMaster的Leader竞选服务jobManagerLeaderElectionService,并且初始化了一个很重要的组件DefaultJobMasterServiceProcessFactory,JobMaster的初始化以及启动都是在这个里面完成的。

    接下来我们回去看刚才那段代码,看runner的启动流程,

    2.2、JobMaster的Leader竞选流程

    我们点进runner.start()方法:

    1. @Override
    2. public void start() throws Exception {
    3. LOG.debug("Start leadership runner for job {}.", getJobID());
    4. // TODO
    5. leaderElectionService.start(this);
    6. }

    再点进leaderElectionService.start方法,选择DefaultLeaderElectionService实现:

    1. @Override
    2. public final void start(LeaderContender contender) throws Exception {
    3. checkNotNull(contender, "Contender must not be null.");
    4. Preconditions.checkState(leaderContender == null, "Contender was already set.");
    5. synchronized (lock) {
    6. /*
    7. TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
    8. 在ResourceManager中调用时,contender为ResourceManager
    9. 在DispatcherRunner中调用时,contender为DispatcherRunner
    10. 当JobMaster竞选时contender为JobMasterServiceLeadershipRunner
    11. */
    12. leaderContender = contender;
    13. // TODO 此处创建选举对象 leaderElectionDriver
    14. leaderElectionDriver =
    15. leaderElectionDriverFactory.createLeaderElectionDriver(
    16. this,
    17. new LeaderElectionFatalErrorHandler(),
    18. leaderContender.getDescription());
    19. LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
    20. running = true;
    21. }
    22. }

    可以看到我们又回到了这里,在之前分析JobManager启动流程的时候,JobManager中的三大核心组件的选举都使用过这个方法,由于目前是JobMaster的选举,这里的contender是JobMasterServiceLeadershipRunner。我们继续点进leaderElectionDriverFactory.createLeaderElectionDriver方法,选择zookeeper实现:

    1. @Override
    2. public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
    3. LeaderElectionEventHandler leaderEventHandler,
    4. FatalErrorHandler fatalErrorHandler,
    5. String leaderContenderDescription)
    6. throws Exception {
    7. // TODO
    8. return new ZooKeeperLeaderElectionDriver(
    9. client,
    10. latchPath,
    11. leaderPath,
    12. leaderEventHandler,
    13. fatalErrorHandler,
    14. leaderContenderDescription);
    15. }

    再进入ZooKeeperLeaderElectionDriver的构造方法:

    1. public ZooKeeperLeaderElectionDriver(
    2. CuratorFramework client,
    3. String latchPath,
    4. String leaderPath,
    5. LeaderElectionEventHandler leaderElectionEventHandler,
    6. FatalErrorHandler fatalErrorHandler,
    7. String leaderContenderDescription)
    8. throws Exception {
    9. this.client = checkNotNull(client);
    10. this.leaderPath = checkNotNull(leaderPath);
    11. this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
    12. this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
    13. this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
    14. leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
    15. cache = new NodeCache(client, leaderPath);
    16. client.getUnhandledErrorListenable().addListener(this);
    17. running = true;
    18. // TODO 开始选举
    19. leaderLatch.addListener(this);
    20. leaderLatch.start();
    21. /*
    22. TODO 选举开始后,不就会接收到响应:
    23. 1.如果竞选成功,则回调该类的isLeader方法
    24. 2.如果竞选失败,则回调该类的notLeader方法
    25. 每一个竞选者对应一个竞选Driver
    26. */
    27. cache.getListenable().addListener(this);
    28. cache.start();
    29. client.getConnectionStateListenable().addListener(listener);
    30. }

    可以看到在这里将开始进行Leader的选举。正如我们之前再JobManager启动时讲到的,在选举完成之后,如果选举成功则会回调当前类的isLeader方法,我们直接去看该方法:

    1. /*
    2. 选举成功
    3. */
    4. @Override
    5. public void isLeader() {
    6. // TODO
    7. leaderElectionEventHandler.onGrantLeadership();
    8. }

    再进入leaderElectionEventHandler.onGrantLeadership():

    1. @Override
    2. @GuardedBy("lock")
    3. public void onGrantLeadership() {
    4. synchronized (lock) {
    5. if (running) {
    6. issuedLeaderSessionID = UUID.randomUUID();
    7. clearConfirmedLeaderInformation();
    8. if (LOG.isDebugEnabled()) {
    9. LOG.debug(
    10. "Grant leadership to contender {} with session ID {}.",
    11. leaderContender.getDescription(),
    12. issuedLeaderSessionID);
    13. }
    14. /*
    15. TODO 有4种竞选者类型,LeaderContender有4种情况
    16. 1.Dispatcher = DefaultDispatcherRunner
    17. 2.JobMaster = JobMasterServiceLeadershipRunner
    18. 3.ResourceManager = ResourceManager
    19. 4.WebMonitorEndpoint = WebMonitorEndpoint
    20. */
    21. leaderContender.grantLeadership(issuedLeaderSessionID);
    22. } else {
    23. if (LOG.isDebugEnabled()) {
    24. LOG.debug(
    25. "Ignoring the grant leadership notification since the {} has "
    26. + "already been closed.",
    27. leaderElectionDriver);
    28. }
    29. }
    30. }
    31. }

    再进入leaderContender.grantLeadership方法,选择JobMasterServiceLeadershipRunner实现:

    1. @Override
    2. public void grantLeadership(UUID leaderSessionID) {
    3. // TODO 检验启动状态
    4. runIfStateRunning(
    5. // TODO 创建JobMaster并启动
    6. () -> startJobMasterServiceProcessAsync(leaderSessionID),
    7. "starting a new JobMasterServiceProcess");
    8. }

    我们再进入startJobMasterServiceProcessAsync方法:

    1. @GuardedBy("lock")
    2. private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
    3. sequentialOperation =
    4. sequentialOperation.thenRun(
    5. // TODO 校验leader状态
    6. () ->
    7. runIfValidLeader(
    8. leaderSessionId,
    9. ThrowingRunnable.unchecked(
    10. // TODO 创建jobMaster并启动
    11. () ->
    12. verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
    13. leaderSessionId)),
    14. "verify job scheduling status and create JobMasterServiceProcess"));
    15. handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
    16. }

    可以看到这里做了一个leader状态校验,我们继续点进verifyJobSchedulingStatusAndCreateJobMasterServiceProcess方法:

    1. @GuardedBy("lock")
    2. private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
    3. throws FlinkException {
    4. final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
    5. getJobSchedulingStatus();
    6. if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
    7. jobAlreadyDone();
    8. } else {
    9. // TODO 创建JobMaster并启动
    10. createNewJobMasterServiceProcess(leaderSessionId);
    11. }
    12. }

    这里会进行一个Job状态的校验,看Job是否已完成,我们再进入createNewJobMasterServiceProcess方法:

    1. @GuardedBy("lock")
    2. private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
    3. Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());
    4. LOG.debug(
    5. "Create new JobMasterServiceProcess because we were granted leadership under {}.",
    6. leaderSessionId);
    7. try {
    8. // TODO 状态注册,标识当前Job为Running状态
    9. runningJobsRegistry.setJobRunning(getJobID());
    10. } catch (IOException e) {
    11. throw new FlinkException(
    12. String.format(
    13. "Failed to set the job %s to running in the running jobs registry.",
    14. getJobID()),
    15. e);
    16. }
    17. // TODO 创建JobMaster并启动
    18. jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);
    19. forwardIfValidLeader(
    20. leaderSessionId,
    21. jobMasterServiceProcess.getJobMasterGatewayFuture(),
    22. jobMasterGatewayFuture,
    23. "JobMasterGatewayFuture from JobMasterServiceProcess");
    24. forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());
    25. confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());
    26. }

    可以看到,这里先对当前Job进行了状态注册,注册为Running状态,我们再进入jobMasterServiceProcessFactory.create方法:

    1. @Override
    2. public JobMasterServiceProcess create(UUID leaderSessionId) {
    3. // TODO 内部构建JobMaster并启动
    4. return new DefaultJobMasterServiceProcess(
    5. jobId,
    6. leaderSessionId,
    7. jobMasterServiceFactory,
    8. cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
    9. }

    再点进DefaultJobMasterServiceProcess的构造方法:

    1. public DefaultJobMasterServiceProcess(
    2. JobID jobId,
    3. UUID leaderSessionId,
    4. JobMasterServiceFactory jobMasterServiceFactory,
    5. Function failedArchivedExecutionGraphFactory) {
    6. this.jobId = jobId;
    7. this.leaderSessionId = leaderSessionId;
    8. // TODO 构建JobMaster并启动
    9. this.jobMasterServiceFuture =
    10. jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);
    11. jobMasterServiceFuture.whenComplete(
    12. (jobMasterService, throwable) -> {
    13. if (throwable != null) {
    14. final JobInitializationException jobInitializationException =
    15. new JobInitializationException(
    16. jobId, "Could not start the JobMaster.", throwable);
    17. LOG.debug(
    18. "Initialization of the JobMasterService for job {} under leader id {} failed.",
    19. jobId,
    20. leaderSessionId,
    21. jobInitializationException);
    22. resultFuture.complete(
    23. JobManagerRunnerResult.forInitializationFailure(
    24. new ExecutionGraphInfo(
    25. failedArchivedExecutionGraphFactory.apply(
    26. jobInitializationException)),
    27. jobInitializationException));
    28. } else {
    29. registerJobMasterServiceFutures(jobMasterService);
    30. }
    31. });
    32. }

    这里使用了异步编程构建并启动JobMaster,并对启动结果进行检查是否有异常,我们点进jobMasterServiceFactory.createJobMasterService方法:

    1. @Override
    2. public CompletableFuture createJobMasterService(
    3. UUID leaderSessionId, OnCompletionActions onCompletionActions) {
    4. return CompletableFuture.supplyAsync(
    5. FunctionUtils.uncheckedSupplier(
    6. // TODO 内部构建JobMaster并启动
    7. () -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
    8. executor);
    9. }

    2.3、JobMaster的初始化和启动

    再点进internalCreateJobMasterService方法:

    1. private JobMasterService internalCreateJobMasterService(
    2. UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {
    3. final JobMaster jobMaster =
    4. new JobMaster(
    5. rpcService,
    6. JobMasterId.fromUuidOrNull(leaderSessionId),
    7. jobMasterConfiguration,
    8. ResourceID.generate(),
    9. jobGraph,
    10. haServices,
    11. slotPoolServiceSchedulerFactory,
    12. jobManagerSharedServices,
    13. heartbeatServices,
    14. jobManagerJobMetricGroupFactory,
    15. onCompletionActions,
    16. fatalErrorHandler,
    17. userCodeClassloader,
    18. shuffleMaster,
    19. lookup ->
    20. new JobMasterPartitionTrackerImpl(
    21. jobGraph.getJobID(), shuffleMaster, lookup),
    22. new DefaultExecutionDeploymentTracker(),
    23. DefaultExecutionDeploymentReconciler::new,
    24. initializationTimestamp);
    25. // TODO JobMaster继承了Endpoint,所以在初始化完成后会回调JobMaster的onStart方法
    26. jobMaster.start();
    27. return jobMaster;
    28. }

    可以看到在这里完成了JobMaster的初始化以及启动。由于JobMaster继承自RpcEndpoint,在之前的Akka章节中我们讲到过,所以这里在完成JobMaster的初始化后会回调JobMaster的onStart生命周期方法,此处的JobMaster.start并没有什么实质性的工作,只是向自己发送了一条消息告知已启动完毕。我们去看JobMaster的onStart方法:

    1. @Override
    2. protected void onStart() throws JobMasterException {
    3. try {
    4. // TODO JobMaster向 ResourceManager注册,开始申请Slot并且调度部署StreamTask
    5. startJobExecution();
    6. } catch (Exception e) {
    7. final JobMasterException jobMasterException =
    8. new JobMasterException("Could not start the JobMaster.", e);
    9. handleJobMasterError(jobMasterException);
    10. throw jobMasterException;
    11. }
    12. }

    在这个方法里,JobMaster即将通过startJobExecution()进行注册动作,以及Slot的申请工作,我们点进startJobExecution()方法:

    1. private void startJobExecution() throws Exception {
    2. validateRunsInMainThread();
    3. // TODO 启动一些服务
    4. startJobMasterServices();
    5. log.info(
    6. "Starting execution of job {} ({}) under job master id {}.",
    7. jobGraph.getName(),
    8. jobGraph.getJobID(),
    9. getFencingToken());
    10. // TODO 解析ExecutionGraph,申请Slot,部署Task到TaskExecutor
    11. startScheduling();
    12. }

    我们首先来看startJobMasterServices()方法,点进来:

    1. private void startJobMasterServices() throws Exception {
    2. try {
    3. // TODO 启动两个心跳服务
    4. this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
    5. this.resourceManagerHeartbeatManager =
    6. createResourceManagerHeartbeatManager(heartbeatServices);
    7. // start the slot pool make sure the slot pool now accepts messages for this leader
    8. // TODO 启动Slot管理服务,内部启动了3个定时任务
    9. slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
    10. // job is ready to go, try to establish connection with resource manager
    11. // - activate leader retrieval for the resource manager
    12. // - on notification of the leader, the connection will be established and
    13. // the slot pool will start requesting slots
    14. // TODO 监听ResourceManager的地址更改
    15. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    16. } catch (Exception e) {
    17. handleStartJobMasterServicesError(e);
    18. }
    19. }

    可以看到这里做了三件事:

    1、启动两个心跳服务

    2、启动Slot管理服务,内部启动了3个定时任务

    3、监听ResourceManager的地址

    由于在后面的章节中我们会专门来讲Slot 的管理以及调度,所以这里就先不分析Slot了,我们回到上层方法看 startScheduling()方法,一路点进来,选择SchedulerBase实现:

    1. @Override
    2. public final void startScheduling() {
    3. mainThreadExecutor.assertRunningInMainThread();
    4. registerJobMetrics();
    5. operatorCoordinatorHandler.startAllOperatorCoordinators();
    6. // TODO
    7. startSchedulingInternal();
    8. }

    在点进startSchedulingInternal方法:

    1. @Override
    2. protected void startSchedulingInternal() {
    3. log.info(
    4. "Starting scheduling with scheduling strategy [{}]",
    5. schedulingStrategy.getClass().getName());
    6. transitionToRunning();
    7. // TODO 申请Slot,并部署StreamTask运行
    8. schedulingStrategy.startScheduling();
    9. }

    再点进schedulingStrategy.startScheduling()方法:

    1. @Override
    2. public void startScheduling() {
    3. final Set sourceRegions =
    4. IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
    5. .filter(this::isSourceRegion)
    6. .collect(Collectors.toSet());
    7. // TODO 申请Slot,并部署StreamTask运行
    8. maybeScheduleRegions(sourceRegions);
    9. }

    在这里,即将进行Slot的申请,我们再点进maybeScheduleRegions方法:

    1. private void maybeScheduleRegions(final Set regions) {
    2. final List regionsSorted =
    3. SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
    4. schedulingTopology, regions);
    5. final Map consumableStatusCache = new HashMap<>();
    6. for (SchedulingPipelinedRegion region : regionsSorted) {
    7. // TODO 申请Slot,并部署StreamTask运行
    8. maybeScheduleRegion(region, consumableStatusCache);
    9. }
    10. }

    再点进maybeScheduleRegion方法:

    1. @Override
    2. public void allocateSlotsAndDeploy(
    3. final List executionVertexDeploymentOptions) {
    4. validateDeploymentOptions(executionVertexDeploymentOptions);
    5. final Map deploymentOptionsByVertex =
    6. groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
    7. final List verticesToDeploy =
    8. executionVertexDeploymentOptions.stream()
    9. .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
    10. .collect(Collectors.toList());
    11. final Map requiredVersionByVertex =
    12. executionVertexVersioner.recordVertexModifications(verticesToDeploy);
    13. transitionToScheduled(verticesToDeploy);
    14. // TODO 申请Slot
    15. final List slotExecutionVertexAssignments =
    16. allocateSlots(executionVertexDeploymentOptions);
    17. final List deploymentHandles =
    18. createDeploymentHandles(
    19. requiredVersionByVertex,
    20. deploymentOptionsByVertex,
    21. slotExecutionVertexAssignments);
    22. // TODO 部署Task
    23. waitForAllSlotsAndDeploy(deploymentHandles);
    24. }

    在这个方法里,主要做了两件事:

    1、Slot的申请

    2、Task的部署

    具体的实现过程我们会在后续Slot的管理章节中详细分析。

    到这里,JobMaster的已经启动完成了。

    总结

            客户端构建好的JobGraph以及所需的资源会发送给WebMonitorEndpoint。在WebMonitorEndpoint内部有一个Router,用来解析url,并发送给url对应的handler,然后回调该handler,也就是JobSubmitHandler的handleRequest方法。

            在handleRequest的方法内会解析请求体中的Job信息以及Job所需的资源,包括JobGraph、Job本身的Jar、Job依赖的Jar等,解析完成后JobSubmitHandler将JobGraph交给Dispatcher来处理。

            Dispatcher在接收到JobGraph后开始着手准备JobMaster的初始化和启动,最先做的事是初始化了一堆JobMaster所需的基础服务,然后构建了一个重要对象DefaultJobMasterServiceFactory, 然后开始准备JobMaster的Leader竞选。

            在JobMaster完成选举之后,会回调isLeader方法,并开始进行JobMaster的初始化,由于JobMaster继承了RPCEndpoint,JobMaster会在初始化完成后回调onStart生命周期方法。

            在onStart生命周期方法里,JobMaster进行了Slot的申请以及Task的部署工作。

  • 相关阅读:
    java运算符
    七、组件的高级用法-组件的组合(children的用法)-高阶组件-封装组件
    【网络安全 --- 工具安装】Centos 7 详细安装过程及xshell,FTP等工具的安装(提供资源)
    [车联网安全自学篇] 八. ATTACK安全之车机(Android)设备中监控命令执行的一些想法【概念篇】
    TikTok大数据解密:社交媒体的秘密洞察
    react中hook 函数的使用
    netperf 测试时延和吞吐
    RK3568 GPIO 按键事件响应
    【Mybatis编程:统计相册表中的数据的数量】
    Flume理论
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126746960