• 【Flink源码】再谈Flink程序提交流程(中)


    书接上回,【Flink源码】再谈 Flink 程序提交流程(上) 一文中我们已经将程序从客户端提交给了 ResourceManager
    接下来我们就去 ResourceManager 中一探究竟


    创建 Dispatcher、ResourceManager

    YarnJobClusterEntrypoint 类是 Yarn per-job 集群的入口,包含了我们想看的 main 方法

    YarnJobClusterEntrypoint.java

    public static void main(String[] args) {
    
        LOG.warn(
                "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
    
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
        Map<String, String> env = System.getenv();
    
        final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
        Preconditions.checkArgument(
                workingDirectory != null,
                "Working directory variable (%s) not set",
                ApplicationConstants.Environment.PWD.key());
    
        try {
            YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
        } catch (IOException e) {
            LOG.warn("Could not log YARN environment information.", e);
        }
    
        final Configuration dynamicParameters =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new DynamicParametersConfigurationParserFactory(),
                        YarnJobClusterEntrypoint.class);
        final Configuration configuration =
                YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
    
        YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
                new YarnJobClusterEntrypoint(configuration);
    
        ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    main 方法中通过 ClusterEntrypoint.runClusterEntrypoint 方法以 YarnJobClusterEntrypoint 对象为参数加载运行入口

    ClusterEntrypoint.java

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
    
        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try {
            clusterEntrypoint.startCluster();
        } catch (ClusterEntrypointException e) {
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }
    
        int returnCode;
        Throwable throwable = null;
    
        try {
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
        } catch (Throwable e) {
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        }
    
        LOG.info(
                "Terminating cluster entrypoint process {} with exit code {}.",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    }
    
    public void startCluster() throws ClusterEntrypointException {
        LOG.info("Starting {}.", getClass().getSimpleName());
    
        try {
            FlinkSecurityManager.setFromConfiguration(configuration);
            PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            configureFileSystems(configuration, pluginManager);
    
            SecurityContext securityContext = installSecurityContext(configuration);
    
            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> {
                                runCluster(configuration, pluginManager);
    
                                return null;
                            });
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    
            try {
                // clean up any partial state
                shutDownAsync(
                                ApplicationStatus.FAILED,
                                ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                ExceptionUtils.stringifyException(strippedThrowable),
                                false)
                        .get(
                                INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                strippedThrowable.addSuppressed(e);
            }
    
            throw new ClusterEntrypointException(
                    String.format(
                            "Failed to initialize the cluster entrypoint %s.",
                            getClass().getSimpleName()),
                    strippedThrowable);
        }
    }
    
    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            initializeServices(configuration, pluginManager);
    
            // write host information into configuration
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
    
            // 创建 dispatcher、ResourceManager 对象的工厂类
            // 其中有从本地重新构建 JobGraph 的过程
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);
            // 通过工厂类创建 dispatcher、ResourceManager 对象
            // Entry 启动 RpcService、HAService、BlobServer、HeartbeatService、MetricRegistry、ExecutionGraphStore 等
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            resourceId.unwrap(),
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            delegationTokenManager,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);
    
            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> {
                                if (throwable != null) {
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                } else {
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    shutDownAsync(
                                            applicationStatus,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            null,
                                            true);
                                }
                            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    我们继续看创建过程
    找到 DispatcherResourceManagerComponentFactory 接口的实现类 DefaultDispatcherResourceManagerComponentFactory

    DefaultDispatcherResourceManagerComponentFactory.java

    public DispatcherResourceManagerComponent create(
            Configuration configuration,
            ResourceID resourceId,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            DelegationTokenManager delegationTokenManager,
            MetricRegistry metricRegistry,
            ExecutionGraphInfoStore executionGraphInfoStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {
    
        LeaderRetrievalService dispatcherLeaderRetrievalService = null;
        LeaderRetrievalService resourceManagerRetrievalService = null;
        WebMonitorEndpoint<?> webMonitorEndpoint = null;
        ResourceManagerService resourceManagerService = null;
        DispatcherRunner dispatcherRunner = null;
    
        try {
            dispatcherLeaderRetrievalService =
                    highAvailabilityServices.getDispatcherLeaderRetriever();
    
            resourceManagerRetrievalService =
                    highAvailabilityServices.getResourceManagerLeaderRetriever();
    
            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            DispatcherGateway.class,
                            DispatcherId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));
    
            final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            ResourceManagerGateway.class,
                            ResourceManagerId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));
    
            final ScheduledExecutorService executor =
                    WebMonitorEndpoint.createExecutorService(
                            configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                            configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                            "DispatcherRestEndpoint");
    
            final long updateInterval =
                    configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
            final MetricFetcher metricFetcher =
                    updateInterval == 0
                            ? VoidMetricFetcher.INSTANCE
                            : MetricFetcherImpl.fromConfiguration(
                                    configuration,
                                    metricQueryServiceRetriever,
                                    dispatcherGatewayRetriever,
                                    executor);
            
            // 创建接收前端 Rest 请求的节点
            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                            fatalErrorHandler);
    
            log.debug("Starting Dispatcher REST endpoint.");
            webMonitorEndpoint.start();
    
            final String hostname = RpcUtils.getHostname(rpcService);
            
            // 创建 ResourceManager 对象,返回的是 new YarnResourceManager
            // 调度过程:AbstractDispatcherResourceManagerComponentFactory
            //                  -> ActiveResourceManagerFactory
            //                  -> YarnResourceManagerFactory
            resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            resourceId,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            delegationTokenManager,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);
    
            final HistoryServerArchivist historyServerArchivist =
                    HistoryServerArchivist.createHistoryServerArchivist(
                            configuration, webMonitorEndpoint, ioExecutor);
    
            final DispatcherOperationCaches dispatcherOperationCaches =
                    new DispatcherOperationCaches(
                            configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
    
            final PartialDispatcherServices partialDispatcherServices =
                    new PartialDispatcherServices(
                            configuration,
                            highAvailabilityServices,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            heartbeatServices,
                            () ->
                                    JobManagerMetricGroup.createJobManagerMetricGroup(
                                            metricRegistry, hostname),
                            executionGraphInfoStore,
                            fatalErrorHandler,
                            historyServerArchivist,
                            metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                            ioExecutor,
                            dispatcherOperationCaches);
            
            // 创建 dispatcherRunner 对象并启动
            log.debug("Starting Dispatcher.");
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);
            
            // 启动 ResourceManager
            log.debug("Starting ResourceManagerService.");
            resourceManagerService.start();
    
            resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
            dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
    
            return new DispatcherResourceManagerComponent(
                    dispatcherRunner,
                    resourceManagerService,
                    dispatcherLeaderRetrievalService,
                    resourceManagerRetrievalService,
                    webMonitorEndpoint,
                    fatalErrorHandler,
                    dispatcherOperationCaches);
    
        } catch (Exception exception) {
            // clean up all started components
            if (dispatcherLeaderRetrievalService != null) {
                try {
                    dispatcherLeaderRetrievalService.stop();
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }
    
            if (resourceManagerRetrievalService != null) {
                try {
                    resourceManagerRetrievalService.stop();
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }
    
            final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
    
            if (webMonitorEndpoint != null) {
                terminationFutures.add(webMonitorEndpoint.closeAsync());
            }
    
            if (resourceManagerService != null) {
                terminationFutures.add(resourceManagerService.closeAsync());
            }
    
            if (dispatcherRunner != null) {
                terminationFutures.add(dispatcherRunner.closeAsync());
            }
    
            final FutureUtils.ConjunctFuture<Void> terminationFuture =
                    FutureUtils.completeAll(terminationFutures);
    
            try {
                terminationFuture.get();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
    
            throw new FlinkException(
                    "Could not create the DispatcherResourceManagerComponent.", exception);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195

    至此,我们找到了 dispatcher、ResouceManager 创建和启动方法,接下来我们有必要深入看看具体的过程

    创建 YarnResourceManager

    首先我们看 YarnResourceManager 创建过程

    ResourceManagerFactory.java

    public ResourceManager<T> createResourceManager(
            ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {
    
        final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
                createResourceManagerRuntimeServices(
                        context.getRmRuntimeServicesConfig(),
                        context.getRpcService(),
                        context.getHighAvailabilityServices(),
                        SlotManagerMetricGroup.create(
                                context.getMetricRegistry(), context.getHostname()));
    
        return createResourceManager(
                context.getRmConfig(),
                context.getResourceId(),
                context.getRpcService(),
                leaderSessionId,
                context.getHeartbeatServices(),
                context.getDelegationTokenManager(),
                context.getFatalErrorHandler(),
                context.getClusterInformation(),
                context.getWebInterfaceUrl(),
                ResourceManagerMetricGroup.create(
                        context.getMetricRegistry(), context.getHostname()),
                resourceManagerRuntimeServices,
                context.getIoExecutor());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里的 createResourceManager 是一个抽象方法,我们找到 ResourceManagerFactory 的 Yarn 实现类 YarnResourceManagerFactory

    YarnResourceManagerFactory.java

    public ResourceManager<YarnWorkerNode> createResourceManager( Configuration configuration,
            ResourceID resourceId,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            FatalErrorHandler fatalErrorHandler,
            ClusterInformation clusterInformation,
            @Nullable String webInterfaceUrl,
            ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
        
        return new YarnResourceManager( rpcService,
            resourceId,
            configuration,
            System.getenv(),
            highAvailabilityServices,
            heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation,
            fatalErrorHandler, webInterfaceUrl, resourceManagerMetricGroup);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    创建 YarnResourceManager 时,创建了 SlotManager
    我们再继续看一下 SlotManager 是如何创建的

    ResourceManagerFactory.java

    private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
            ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            SlotManagerMetricGroup slotManagerMetricGroup) {
    
            return ResourceManagerRuntimeServices.fromConfiguration(
                    rmRuntimeServicesConfig,
                    highAvailabilityServices,
                    rpcService.getScheduledExecutor(),
                    slotManagerMetricGroup);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ResourceManagerRuntimeServices.java

    public static ResourceManagerRuntimeServices fromConfiguration(
                ResourceManagerRuntimeServicesConfiguration configuration,
                HighAvailabilityServices highAvailabilityServices,
                ScheduledExecutor scheduledExecutor,
                SlotManagerMetricGroup slotManagerMetricGroup) {
    
            final SlotManager slotManager =
                    createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);
    
            final JobLeaderIdService jobLeaderIdService =
                    new DefaultJobLeaderIdService(
                            highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
    
            return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    到这里,我们找到了创建 YarnResouceManager 的方法 createSlotManager

    创建并启动 Dispatcher

    接下来我们看 Dispatcher 的创建和启动过程
    找到接口 DispatcherRunnerFactory 的实现类 DefaultDispatcherRunnerFactory

    DefaultDispatcherRunnerFactory.java

    public DispatcherRunner createDispatcherRunner(
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler,
            JobPersistenceComponentFactory jobPersistenceComponentFactory,
            Executor ioExecutor,
            RpcService rpcService,
            PartialDispatcherServices partialDispatcherServices)
            throws Exception {
    
            final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                    dispatcherLeaderProcessFactoryFactory.createFactory(
                            jobPersistenceComponentFactory,
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices,
                            fatalErrorHandler);
    
            return DefaultDispatcherRunner.create(
                    leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    再看 DefaultDispatcherRunner 类

    DefaultDispatcherRunner.java

    public static DispatcherRunner create(
                LeaderElectionService leaderElectionService,
                FatalErrorHandler fatalErrorHandler,
                DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
                throws Exception {
            final DefaultDispatcherRunner dispatcherRunner =
                    new DefaultDispatcherRunner(
                            leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
            return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
                    dispatcherRunner, leaderElectionService);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    DispatcherRunnerLeaderElectionLifecycleManager.java

    public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
                T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
            return new DispatcherRunnerLeaderElectionLifecycleManager<>(
                    dispatcherRunner, leaderElectionService);
    }
    
    private DispatcherRunnerLeaderElectionLifecycleManager(
                T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
            this.dispatcherRunner = dispatcherRunner;
            this.leaderElectionService = leaderElectionService;
            
            // 启动 dispatcher 的 leader 选举
            leaderElectionService.start(dispatcherRunner);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    找到 LeaderElectionService 接口的实现类 StandaloneLeaderElectionService

    StandaloneLeaderElectionService.java

    public void start(LeaderContender newContender) throws Exception {
            if (contender != null) {
                // Service was already started
                throw new IllegalArgumentException(
                        "Leader election service cannot be started multiple times.");
            }
    
            contender = Preconditions.checkNotNull(newContender);
    
            // directly grant leadership to the given contender
            contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    DefaultDispatcherRunner.java

    public void grantLeadership(UUID leaderSessionID) {
            runActionIfRunning(
                    () -> {
                        LOG.info(
                                "{} was granted leadership with leader id {}. Creating new {}.",
                                getClass().getSimpleName(),
                                leaderSessionID,
                                DispatcherLeaderProcess.class.getSimpleName());
                        startNewDispatcherLeaderProcess(leaderSessionID);
                    });
    }
    
    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
            stopDispatcherLeaderProcess();
    
            dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
    
            final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
            FutureUtils.assertNoException(
                    previousDispatcherLeaderProcessTerminationFuture.thenRun(
                            newDispatcherLeaderProcess::start));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    AbstractDispatcherLeaderProcess.java

    public final void start() {
            runIfStateIs(State.CREATED, this::startInternal);
    }
    
    private void startInternal() {
            log.info("Start {}.", getClass().getSimpleName());
            state = State.RUNNING;
            onStart();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    再往下找实现了 onStart 方法的实现类

    JobDispatcherLeaderProcess.java

    protected void onStart() {
            final DispatcherGatewayService dispatcherService =
                    dispatcherGatewayServiceFactory.create(
                            DispatcherId.fromUuid(getLeaderSessionId()),
                            CollectionUtil.ofNullable(jobGraph),
                            CollectionUtil.ofNullable(recoveredDirtyJobResult),
                            ThrowingJobGraphWriter.INSTANCE,
                            jobResultStore);
    
            completeDispatcherSetup(dispatcherService);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    DefaultDispatcherGatewayServiceFactory.java

    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
                DispatcherId fencingToken,
                Collection<JobGraph> recoveredJobs,
                Collection<JobResult> recoveredDirtyJobResults,
                JobGraphWriter jobGraphWriter,
                JobResultStore jobResultStore) {
    
            final Dispatcher dispatcher;
            try {
                dispatcher =
                        dispatcherFactory.createDispatcher(
                                rpcService,
                                fencingToken,
                                recoveredJobs,
                                recoveredDirtyJobResults,
                                (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                        new NoOpDispatcherBootstrap(),
                                PartialDispatcherServicesWithJobPersistenceComponents.from(
                                        partialDispatcherServices, jobGraphWriter, jobResultStore));
            } catch (Exception e) {
                throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
            }
    
            // 启动 dispatcher
            dispatcher.start();
    
            return DefaultDispatcherGatewayService.from(dispatcher);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    至此,dispatcher 启动完毕

    启动 ResourceManager

    下面我们来看 ResourceManager 启动过程

    ResourceManager.java

    public final void onStart() throws Exception {
            try {
                log.info("Starting the resource manager.");
                startResourceManagerServices();
                startedFuture.complete(null);
            } catch (Throwable t) {
                final ResourceManagerException exception =
                        new ResourceManagerException(
                                String.format("Could not start the ResourceManager %s", getAddress()),
                                t);
                onFatalError(exception);
                throw exception;
            }
    }
    
    private void startResourceManagerServices() throws Exception {
            try {
                jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    
                registerMetrics();
    
                startHeartbeatServices();
    
                slotManager.start(
                        getFencingToken(),
                        getMainThreadExecutor(),
                        new ResourceActionsImpl(),
                        blocklistHandler::isBlockedTaskManager);
    
                delegationTokenManager.start();
    
                initialize();
            } catch (Exception e) {
                handleStartResourceManagerServicesException(e);
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在 startResourceManagerServices 方法中,包含了初始化、心跳开启、slotManager 开启等操作
    到这里,我们总算探究完成了 dispatcher 和 ResourceManager 的创建和启动过程
    现在我们回到最开始,继续看 Flink 程序提交流程的下一个步骤

    Dispatcher 启动 JobManager

    在启动了 dispatcher 和 ResourceManager 后,Dispatcher 启动了 JobManager
    要一探究竟首先我们先进入 dispatcher 的实现类 Dispatcher

    Dispatcher.java

    public void onStart() throws Exception {
            try {
                // 启动 Dispatcher
                startDispatcherServices();
            } catch (Throwable t) {
                final DispatcherException exception =
                        new DispatcherException(
                                String.format("Could not start the Dispatcher %s", getAddress()), t);
                onFatalError(exception);
                throw exception;
            }
    
            startCleanupRetries();
            // 启动 Job
            startRecoveredJobs();
    
            this.dispatcherBootstrap =
                    this.dispatcherBootstrapFactory.create(
                            getSelfGateway(DispatcherGateway.class),
                            this.getRpcService().getScheduledExecutor(),
                            this::onFatalError);
    }
    
    private void startRecoveredJobs() {
            for (JobGraph recoveredJob : recoveredJobs) {
                runRecoveredJob(recoveredJob);
            }
            recoveredJobs.clear();
    }
    
    private void runRecoveredJob(final JobGraph recoveredJob) {
            checkNotNull(recoveredJob);
            try {
                runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);
            } catch (Throwable throwable) {
                onFatalError(
                        new DispatcherException(
                                String.format(
                                        "Could not start recovered job %s.", recoveredJob.getJobID()),
                                throwable));
            }
    }
    
    private void runJob(JobGraph jobGraph, ExecutionType executionType) { 
            ... ...
            CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
            ... ... 
    }
    
    CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long
    initializationTimestamp) {
            final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync(
            () -> {
                    try {
                    JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner( jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler,
                    initializationTimestamp);
            // 启动 JobManagerRunner 
            runner.start();
            return runner;
            }
            ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    JobManagerRunnerImpl.java

    public void start() throws Exception { 
            try {
                    leaderElectionService.start(this); 
            } catch (Exception e) {
                    log.error("Could not start the JobManager because the leader election service did not start.", e);
                    throw new Exception("Could not start the leader election service.", e); 
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    StandaloneLeaderElectionService.java

    public void start(LeaderContender newContender) throws Exception {
            ... ...
            contender.grantLeadership(HighAvaliabilityServices.DEFAULT_LEADER_ID);
    }
    
    • 1
    • 2
    • 3
    • 4

    JobManagerRunnerImpl.java

    public void grantLeadership(final UUID leaderSessionID) { 
            synchronized (lock) {
                    if (shutdown) {
                            log.debug("JobManagerRunner cannot be granted leadership because it is already shut
                            down.");
                            return;
                    }
                    leadershipOperation = leadershipOperation.thenCompose( (ignored) -> {
                            synchronized (lock) {
                                    // 校验作业的调度状态然后启动作业管理器
                                    return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); 
                            }
                    });
                    handleException(leadershipOperation, "Could not start the job manager."); 
            }
    }
    
    private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { 
            final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture =
            getJobSchedulingStatus();
            return jobSchedulingStatusFuture.thenCompose(
                    jobSchedulingStatus -> {
                            if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                                    return jobAlreadyDone(); 
                            } else {
                                    return startJobMaster(leaderSessionId); 
                            }
            }); 
    }
    
    private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
            ... ...
            startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
            ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    JobMaster.java

    public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception { 
            // make sure we receive RPC and async calls
            start();
            return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }
    
    private void startJobExecution() throws Exception {
            validateRunsInMainThread();
    
            JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
            shuffleMaster.registerJob(context);
            
            // 启动 JobMaster
            startJobMasterServices();
    
            log.info(
                    "Starting execution of job '{}' ({}) under job master id {}.",
                    jobGraph.getName(),
                    jobGraph.getJobID(),
                    getFencingToken());
            // 开始调度
            startScheduling();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    最终,由 Dispatcher 类经过层层调用找到 JobMaster 类调用了其启动方法。

    ResourceManager 启动 SlotManager

    在创建了 ResourceManager 和 Dispatcher 之后,Dispatcher 启动了 JobManager,而 ResourceManager 则启动了 SlotManager
    下面我们就具体来看这一过程
    故事还要从 ResouceManager 类的 onStart 方法说起

    ResourceManager.java

    public final void onStart() throws Exception {
            try {
                log.info("Starting the resource manager.");
                startResourceManagerServices();
                startedFuture.complete(null);
            } catch (Throwable t) {
                final ResourceManagerException exception =
                        new ResourceManagerException(
                                String.format("Could not start the ResourceManager %s", getAddress()),
                                t);
                onFatalError(exception);
                throw exception;
            }
    }
    
    // 开启 ResourceManager 服务
        private void startResourceManagerServices() throws Exception {
            try {
                jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    
                registerMetrics();
    
                startHeartbeatServices();
                
                // 开启 SlotManager
                slotManager.start(
                        getFencingToken(),
                        getMainThreadExecutor(),
                        new ResourceActionsImpl(),
                        blocklistHandler::isBlockedTaskManager);
    
                delegationTokenManager.start();
    
                // 初始化 ResourceManager
                initialize();
            } catch (Exception e) {
                handleStartResourceManagerServicesException(e);
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    由源码可知,该过程分为了两个重要步骤:开启 SlotManager 和初始化 ResourceManager,即创建 Yarn 的 ResourceManager 和 NodeManager 客户端
    start 为 SlotManager 接口的方法,找到该接口的实现类 FineGrainedSlotManager,该类中的 start 方法根据给定的 leader id 和 ResourceManager 行为来实现开启 SlotManager

    FineGrainedSlotManager.java

    public void start(
                ResourceManagerId newResourceManagerId,
                Executor newMainThreadExecutor,
                ResourceActions newResourceActions,
                BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
            LOG.info("Starting the slot manager.");
    
            resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
            mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
            resourceActions = Preconditions.checkNotNull(newResourceActions);
            // slot 状态同步
            slotStatusSyncer.initialize(
                    taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
            blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
    
            started = true;
    
            // TaskManager 超时检查
            taskManagerTimeoutsCheck =
                    scheduledExecutor.scheduleWithFixedDelay(
                            () -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts),
                            0L,
                            taskManagerTimeout.toMilliseconds(),
                            TimeUnit.MILLISECONDS);
    
            registerSlotManagerMetrics();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    步骤已经很明显了

    JobManager 申请 Slot

    在创建了 JobManager 和 SlotManager 之后,下一步 JobManager 申请了 slot

    启动 SlotPool

    在 JobMaster 启动之时,同时启动了 SlotPool,向 ResourceManager 注册

    JobMaster.java

    private void startJobMasterServices() throws Exception {
            try {
                // 启动 TaskManager 心跳服务
                this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
                // 启动 ResourceManager 心跳服务
                this.resourceManagerHeartbeatManager =
                        createResourceManagerHeartbeatManager(heartbeatServices);
    
                // start the slot pool make sure the slot pool now accepts messages for this leader
                // 启动 slotPool
                slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
    
                // job is ready to go, try to establish connection with resource manager
                //   - activate leader retrieval for the resource manager
                //   - on notification of the leader, the connection will be established and
                //     the slot pool will start requesting slots
                // 启动后 slot pool 开始向 slot manager 请求 slot
                resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            } catch (Exception e) {
                handleStartJobMasterServicesError(e);
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    向 ResourceManager 注册

    经过下面层层调用:
    resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    -> notifyOfNewResourceManagerLeader()
    -> reconnectToResourceManagerLeader()
    -> tryConnectToResourceManager()
    -> connectToResourceManager()

    private void connectToResourceManager() {
            ... ...
            resourceManagerConnection = new ResourceManagerConnection(
                    log,
                    jobGraph.getJobID(),
                    resourceId,
                    getAddress(),
                    getFencingToken(),
                    resourceManagerAddress.getAddress(),
                    resourceManagerAddress.getResourceManagerId(),
                    scheduledExecutorService
            );
            resourceManagerConnection.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    RegisteredRpcConnection.java

    public void start() {
            checkState(!closed, "The RPC connection is already closed");
            checkState(
                    !isConnected() && pendingRegistration == null,
                    "The RPC connection is already started");
    
            final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
    
            if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
                newRegistration.startRegistration();
            } else {
                // concurrent start operation
                newRegistration.cancel();
            }
    }
    
    private RetryingRegistration<F, G, S, R> createNewRegistration() {
            RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());
    
            CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
                    newRegistration.getFuture();
    
            future.whenCompleteAsync(
                    (RetryingRegistration.RetryingRegistrationResult<G, S, R> result,
                            Throwable failure) -> {
                        if (failure != null) {
                            if (failure instanceof CancellationException) {
                                // we ignore cancellation exceptions because they originate from
                                // cancelling
                                // the RetryingRegistration
                                log.debug(
                                        "Retrying registration towards {} was cancelled.",
                                        targetAddress);
                            } else {
                                // this future should only ever fail if there is a bug, not if the
                                // registration is declined
                                onRegistrationFailure(failure);
                            }
                        } else {
                            if (result.isSuccess()) {
                                targetGateway = result.getGateway();
                                onRegistrationSuccess(result.getSuccess());
                            } else if (result.isRejection()) {
                                onRegistrationRejection(result.getRejection());
                            } else {
                                throw new IllegalArgumentException(
                                        String.format(
                                                "Unknown retrying registration response: %s.", result));
                            }
                        }
                    },
                    executor);
    
            return newRegistration;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    TaskExecutorToResourceManagerConnection.java

    protected RetryingRegistration<
                        ResourceManagerId,
                        ResourceManagerGateway,
                        TaskExecutorRegistrationSuccess,
                        TaskExecutorRegistrationRejection>
                generateRegistration() {
            return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
                    log,
                    rpcService,
                    getTargetAddress(),
                    getTargetLeaderId(),
                    retryingRegistrationConfiguration,
                    taskExecutorRegistration);
    }
    
    ResourceManagerRegistration(
                    Logger log,
                    RpcService rpcService,
                    String targetAddress,
                    ResourceManagerId resourceManagerId,
                    RetryingRegistrationConfiguration retryingRegistrationConfiguration,
                    TaskExecutorRegistration taskExecutorRegistration) {
    
                super(
                        log,
                        rpcService,
                        "ResourceManager",
                        ResourceManagerGateway.class,
                        targetAddress,
                        resourceManagerId,
                        retryingRegistrationConfiguration);
                this.taskExecutorRegistration = taskExecutorRegistration;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    SlotPool 申请 slot

    注册成功调用 onRegistrationSuccess(),向 ResourceManager 进行 slot 的申请

    JobMaster.java 的内部类 ResourceManagerConnection

    protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
                runAsync(
                        () -> {
                            // filter out outdated connections
                            //noinspection ObjectEquality
                            if (this == resourceManagerConnection) {
                                establishResourceManagerConnection(success);
                            }
                        });
    }
    
    private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
            final ResourceManagerId resourceManagerId = success.getResourceManagerId();
    
            // verify the response with current connection
            if (resourceManagerConnection != null
                    && Objects.equals(
                            resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
    
                log.info(
                        "JobManager successfully registered at ResourceManager, leader id: {}.",
                        resourceManagerId);
    
                final ResourceManagerGateway resourceManagerGateway =
                        resourceManagerConnection.getTargetGateway();
    
                final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
    
                establishedResourceManagerConnection =
                        new EstablishedResourceManagerConnection(
                                resourceManagerGateway, resourceManagerResourceId);
    
                blocklistHandler.registerBlocklistListener(resourceManagerGateway);
                // 连接到 ResourceManager
                slotPoolService.connectToResourceManager(resourceManagerGateway);
                partitionTracker.connectToResourceManager(resourceManagerGateway);
    
                resourceManagerHeartbeatManager.monitorTarget(
                        resourceManagerResourceId,
                        new ResourceManagerHeartbeatReceiver(resourceManagerGateway));
            } else {
                log.debug(
                        "Ignoring resource manager connection to {} because it's duplicated or outdated.",
                        resourceManagerId);
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    DeclarativeSlotPoolService.java

    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
            this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
            // work on all slots waiting for this connection
            for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            // 向 ResourceManager 申请 slot
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); }
            // all sent off
            waitingForResourceManager.clear();
    }
    
    private void requestSlotFromResourceManager(
    final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) {
            ... ...
            CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
                    jobMasterId,
                    new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
                    rpcTimeout);
            ... ... 
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    ResourceManager.java:由 ResourceManager 里的 SlotManager 处理请求

    public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId,
    SlotRequest slotRequest, final Time timeout) {
            ... ...
            try {
                    // SlotManager 处理 slot 请求 
                    slotManager.registerSlotRequest(slotRequest);
            }
            ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException { 
            checkInit();
            ... ...
            PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); 
            pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
            try {
                    internalRequestSlot(pendingSlotRequest); 
            }
            ... ... 
    }
    
    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws
    ResourceManagerException {
            final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
            OptionalConsumer.of(findMatchingSlot(resourceProfile))
                    .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent(() ->
                    fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)); 
    }
    
    private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
            ... ...
            if(!pendingTaskManagerSlotOptional.isPresent()) {
                    pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
            }
            ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    Spark集群安装
    江苏MES
    go学习之数组与Map
    [附源码]计算机毕业设计毕业生就业管理系统Springboot程序
    String常见面试题
    力扣每日一题:754. 到达终点数字【数学题】
    2022年11月19日(星期六):骑行甸尾
    halcon-思路整理
    NodeJS原生后台开发教程
    Nginx
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722769