• 【Flink源码】JobManager启动流程


    写在前面

    【Flink源码】再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变。因此,思虑再三决定针对 JobManager 相关源码根据最新的 Flink 版本(1.17)单独成文。


    JobManager 是什么?

    Flink 的主节点 JobManager 是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不一样
    JobManager 有三大核心内容:ResourceManager、Dispatcher 和 WebMonitorEndpoint

    • ResourceManager:Flink集群的资源管理器,只有一个,负责 Slot 的管理和申请等工作
    • Dispatcher
      • 负责接收用户提交 JobGraph,然后启动一个 JobMaster,类似于 Yarn 中的 AppMaster 和 Spark 中的 Driver
      • 内有一个持久服务:JobGraphStore,负责存储 JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业 JobGraph
    • WebMonitorEndpoint:Rest 服务,内部有一个 Netty 服务,客户端的所有请求都由该组件接收处理

    当 Client 提交一个 Job 到集群时(Client 会把 Job 构建成一个 JobGraph),主节点接收到提交的 Job 的 Rest 请求后,WebMonitorEndpoint 会通过 Router 进行解析找到对应的 Handler 来执行处理,处理完毕后交由 Dispatcher,Dispatcher 负责搭起 JobMaster 来负责这个 Job 内部的 Task 的部署执行,执行 Task 所需的资源由 JobMaster 向 ResourceManager 申请

    JobManager 启动源码

    JobManager 启动流程

    JobManager 的启动流程分为三个部分:

    • 初始化 8 个基础服务
    • 创建工厂实例
    • 通过不同的工厂实例创建三大核心组件 ResourceManager、Dispatcher、WebMonitorEndpoint

    主节点准备工作

    我们以 Standalone 模式为例,下同
    找到主节点启动类 StandaloneSessionClusterEntrypoint

    StandaloneSessionClusterEntrypoint.java

    public static void main(String[] args) {
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        // 解析 flink run 命令的参数
        final EntrypointClusterConfiguration entrypointClusterConfiguration =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new EntrypointClusterConfigurationParserFactory(),
                        StandaloneSessionClusterEntrypoint.class);
        // 解析 flink-conf.yaml 配置文件
        Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
        // 创建主节点
        StandaloneSessionClusterEntrypoint entrypoint =
                new StandaloneSessionClusterEntrypoint(configuration);
        // 启动主节点
        ClusterEntrypoint.runClusterEntrypoint(entrypoint);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这个入口类主要做了四件事:

    1. 解析提交作业命令的参数
    2. 解析 flink-conf.yaml 配置文件
    3. 创建主节点
    4. 启动主节点

    首先来看解析 flink-conf.yaml 的过程

    public static Configuration loadConfiguration(
            final String configDir, @Nullable final Configuration dynamicProperties) {
    
        if (configDir == null) {
            throw new IllegalArgumentException(
                    "Given configuration directory is null, cannot load configuration");
        }
    
        final File confDirFile = new File(configDir);
        if (!(confDirFile.exists())) {
            throw new IllegalConfigurationException(
                    "The given configuration directory name '"
                            + configDir
                            + "' ("
                            + confDirFile.getAbsolutePath()
                            + ") does not describe an existing directory.");
        }
    
        // get Flink yaml configuration file
        // TODO 读取flink-conf.yaml文件
        final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
    
        // 文件不存在则报错
        if (!yamlConfigFile.exists()) {
            throw new IllegalConfigurationException(
                    "The Flink config file '"
                            + yamlConfigFile
                            + "' ("
                            + yamlConfigFile.getAbsolutePath()
                            + ") does not exist.");
        }
        // TODO 解析flink-conf.yaml文件
        Configuration configuration = loadYAMLResource(yamlConfigFile);
    
        if (dynamicProperties != null) {
            configuration.addAll(dynamicProperties);
        }
    
        return configuration;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    首先根据 conf 路径将文件读进来,再通过 loadYAMLResource() 方法解析文件中的配置,并将 configuration 返回出去

    主节点启动过程

    ClusterEntrypoint.java

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
    
        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try {
            // 启动主类
            clusterEntrypoint.startCluster();
        } catch (ClusterEntrypointException e) {
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }
    
        int returnCode;
        Throwable throwable = null;
    
        try {
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
        } catch (Throwable e) {
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        }
    
        LOG.info(
                "Terminating cluster entrypoint process {} with exit code {}.",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    }
    
    public void startCluster() throws ClusterEntrypointException {
        LOG.info("Starting {}.", getClass().getSimpleName());
    
        try {
            FlinkSecurityManager.setFromConfiguration(configuration);
            PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            configureFileSystems(configuration, pluginManager);
    
            SecurityContext securityContext = installSecurityContext(configuration);
    
            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> {
                                runCluster(configuration, pluginManager);
    
                                return null;
                            });
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    
            try {
                // clean up any partial state
                shutDownAsync(
                                ApplicationStatus.FAILED,
                                ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                ExceptionUtils.stringifyException(strippedThrowable),
                                false)
                        .get(
                                INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                strippedThrowable.addSuppressed(e);
            }
    
            throw new ClusterEntrypointException(
                    String.format(
                            "Failed to initialize the cluster entrypoint %s.",
                            getClass().getSimpleName()),
                    strippedThrowable);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    我们继续进入 runCluster 方法
    该方法是主节点启动的核心方法,主要做了三件事:

    1. 初始化主节点对外提供服务的时候所需要的三大核心组件启动时所需的基础服务
    2. 初始化一个 DispatcherResourceManagerComponentFactory 工厂实例,内部初始化了三大核心组件的工厂实例
    3. 根据工厂类和基础环境,创建三大核心组件

    首先来看初始化八大基础服务

    protected void initializeServices(Configuration configuration, PluginManager pluginManager)
            throws Exception {
    
        LOG.info("Initializing cluster services.");
    
        synchronized (lock) {
            resourceId =
                    configuration
                            .getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
                            .map(
                                    value ->
                                            DeterminismEnvelope.deterministicValue(
                                                    new ResourceID(value)))
                            .orElseGet(
                                    () ->
                                            DeterminismEnvelope.nondeterministicValue(
                                                    ResourceID.generate()));
    
            LOG.debug(
                    "Initialize cluster entrypoint {} with resource id {}.",
                    getClass().getSimpleName(),
                    resourceId);
    
            workingDirectory =
                    ClusterEntrypointUtils.createJobManagerWorkingDirectory(
                            configuration, resourceId);
    
            LOG.info("Using working directory: {}.", workingDirectory);
    
            rpcSystem = RpcSystem.load(configuration);
            // 初始化和启动 AkkaRpcService,内部包装了 ActorSystem
            // 创建一个 AkkaRpc 服务,基于 Akka 的 RpcService 实现
            // commonRpcService 是一个基于 Akka 的 ActorSystem,其实就是一个 TCP 的 RPC 服务,端口:6123
            commonRpcService =
                    RpcUtils.createRemoteRpcService(
                            rpcSystem,
                            configuration,
                            configuration.getString(JobManagerOptions.ADDRESS),
                            getRPCPortRange(configuration),
                            configuration.getString(JobManagerOptions.BIND_HOST),
                            configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
            // 启动一个 JMXService,用于客户端连接 JobManager,JVM 监控
            JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
    
            // update the configuration used to create the high availability services
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
            
            // 初始化 IO 线程池,大小为当前节点 CPU 核心数 * 4
            ioExecutor =
                    Executors.newFixedThreadPool(
                            ClusterEntrypointUtils.getPoolSize(configuration),
                            new ExecutorThreadFactory("cluster-io"));
            // 初始化一个基于 Zookeeper 的 HA 服务:ZookeeperHaServices
            haServices = createHaServices(configuration, ioExecutor, rpcSystem);
            // 初始化大文件存储 BlobServer 服务端
            // 所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等
            blobServer =
                    BlobUtils.createBlobServer(
                            configuration,
                            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                            haServices.createBlobStore());
            blobServer.start();
            configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
            // 心跳服务
            heartbeatServices = createHeartbeatServices(configuration);
            delegationTokenManager =
                    KerberosDelegationTokenManagerFactory.create(
                            getClass().getClassLoader(),
                            configuration,
                            commonRpcService.getScheduledExecutor(),
                            ioExecutor);
            // 启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystem
            metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
    
            final RpcService metricQueryServiceRpcService =
                    MetricUtils.startRemoteMetricsRpcService(
                            configuration,
                            commonRpcService.getAddress(),
                            configuration.getString(JobManagerOptions.BIND_HOST),
                            rpcSystem);
            metricRegistry.startQueryService(metricQueryServiceRpcService, null);
    
            final String hostname = RpcUtils.getHostname(commonRpcService);
    
            processMetricGroup =
                    MetricUtils.instantiateProcessMetricGroup(
                            metricRegistry,
                            hostname,
                            ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                    configuration));
            // 初始化一个用来存储 ExecutionGraph 的 Store,实现是 FileArchivedExecutionGraphStore
            // JobGraphStore 会在 Dispatcher 启动时启动
            executionGraphInfoStore =
                    createSerializableExecutionGraphStore(
                            configuration, commonRpcService.getScheduledExecutor());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    初始化的服务:

    1. commonRPCService:基于 Akka 的 RpcService 实现。内部包装了 ActorSystem,这个服务其实就是一个 TCP 的 RPC 服务,端口为 6123
    2. JMXService:启动一个 JMXService,用于客户端连接 JobManager JVM 监控
    3. IOExecutor:启动一个线程池,大小为当前节点 CPU 核心数 * 4
    4. haServices:初始化一个基于 Zookeeper 的 HA 服务 ZookeeperHaServices,提供对高可用性的所有服务的访问注册,分布式计数器和领导人选举
    5. BlobServer:初始化大文件存储 BlobServer 服务端,所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等
    6. heartbeatServices:提供心跳所需的所有服务,包括创建心跳接收器和心跳发送者
    7. metricRegistry:启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystem,跟踪所有已注册的 Metric,作为连接 MetricGroup 和 MetricReporter
    8. archivedExecutionGraphStore:存储执行图 ExecutionGraph 的可序列化形式。注意此处不是 JobGraphStore,JobGraphStore 会在 Dispatcher 启动时启动

    接下来创建核心工厂类

    找到 StandaloneSessionClusterEntrypoint 类

    StandaloneSessionClusterEntrypoint.java

    protected DefaultDispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(Configuration configuration) {
        // 创建第一个工厂 StandaloneResourceManagerFactory
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                StandaloneResourceManagerFactory.getInstance());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    进入 createSessionComponentFactory 方法

    DefaultDispatcherResourceManagerComponentFactory.java

    public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
            ResourceManagerFactory<?> resourceManagerFactory) {
        // 构建工厂
        return new DefaultDispatcherResourceManagerComponentFactory(
                // 第二个工厂
                DefaultDispatcherRunnerFactory.createSessionRunner(
                        SessionDispatcherFactory.INSTANCE),
                // 第一个工厂
                resourceManagerFactory,
                // 第三个工厂
                SessionRestEndpointFactory.INSTANCE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可见,主节点一共创建了三个核心组件的工厂实例:

    1. 生产 DefaultDispatcherRunner
    2. 生产 StandaloneResourceManager
    3. 生产 DispatcherRestEndpoint

    接下来通过工厂实例创建 ResourceManager、DispatcherRunner、WebMonitorEndpoint

    1. DispatcherRunner,实现是:DefaultDispatcherRunner
    2. ResourceManager,实现是:StandaloneResourceManager
    3. WebMonitorEndpoint,实现是:DispatcherRestEndpoint

    我们从 dispatcherResourceManagerComponentFactory.create 开始看

    第一步:首先初始化一些监控服务

    DefaultDispatcherResourceManagerComponentFactory.java

    // 监控 Dispatcher
    dispatcherLeaderRetrievalService =
            highAvailabilityServices.getDispatcherLeaderRetriever();
    // 监控 ResourceManager
    resourceManagerRetrievalService =
            highAvailabilityServices.getResourceManagerLeaderRetriever();
    // ResourceManager 的 GatewayRetriever
    final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
            new RpcGatewayRetriever<>(
                    rpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    new ExponentialBackoffRetryStrategy(
                            12, Duration.ofMillis(10), Duration.ofMillis(50)));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    第二步:构建一个线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求

    // 创建线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求
    final ScheduledExecutorService executor =
            WebMonitorEndpoint.createExecutorService(
                    configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                    configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                    "DispatcherRestEndpoint");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    第三步:初始化 MetricFetcher,刷新间隔 10s

    // 初始化 MetricFetcher,刷新间隔 10s
    final long updateInterval =
            configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
    final MetricFetcher metricFetcher =
            updateInterval == 0
                    ? VoidMetricFetcher.INSTANCE
                    : MetricFetcherImpl.fromConfiguration(
                            configuration,
                            metricQueryServiceRetriever,
                            dispatcherGatewayRetriever,
                            executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    第四步:创建 WebMonitorEndpoint 实例,并启动,在Standalone 模式下为:DispatcherRestEndpoint 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler

    // 创建 WebMonitorEndpoint 实例,在 Standalone 模式下为:DispatcherRestEndpoint
    // 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler
    webMonitorEndpoint =
            restEndpointFactory.createRestEndpoint(
                    configuration,
                    dispatcherGatewayRetriever,
                    resourceManagerGatewayRetriever,
                    blobServer,
                    executor,
                    metricFetcher,
                    highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                    fatalErrorHandler);
    // 启动 WebMonitorEndpoint
    log.debug("Starting Dispatcher REST endpoint.");
    webMonitorEndpoint.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    第五步:创建 ResourceManager 对象

    1. ResourceManager 是一个 RpcEndpoint(Actor),当构建好对象后启动时会触发 onStart(Actor 的 perStart 生命周期方法)方法
    2. ResourceManager 也是一个 LeaderContender,也会执行竞选,会执行竞选结果方法
    3. ResourceManagerService 具有两个心跳服务和两个定时服务:
      • 两个心跳服务:从节点和主节点之间的心跳,Job 的主控程序和主节点之间的心跳
      • 两个定时服务:TaskManager 的超时检查服务 Slot 申请的超时检查服务
    // 创建 ResourceManager 对象
    resourceManagerService =
            ResourceManagerServiceImpl.create(
                    resourceManagerFactory,
                    configuration,
                    resourceId,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    delegationTokenManager,
                    fatalErrorHandler,
                    new ClusterInformation(hostname, blobServer.getPort()),
                    webMonitorEndpoint.getRestBaseUrl(),
                    metricRegistry,
                    hostname,
                    ioExecutor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    第六步:构建了一个 DispatcherRunner,注意不是 Dispatcher,Dispatcher 的构建和启动时再 DispatcherRunner 内部实现的

    // 创建 dispatcherRunner 对象并启动
    log.debug("Starting Dispatcher.");
    dispatcherRunner =
            dispatcherRunnerFactory.createDispatcherRunner(
                    highAvailabilityServices.getDispatcherLeaderElectionService(),
                    fatalErrorHandler,
                    new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                    ioExecutor,
                    rpcService,
                    partialDispatcherServices);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    第七步:启动 ResourceManager

    // 启动 ResourceManager
    log.debug("Starting ResourceManagerService.");
    resourceManagerService.start();
    
    • 1
    • 2
    • 3

    至此,JobManager 启动完毕

    关于 ResourceManager、WebMonitorEndpoint、Dispatcher 的启动流程留待后文讨论

  • 相关阅读:
    软考高级系统架构师冲关预测
    118、不要“教”用户做事,应该让他秒懂秒会
    你以为键入网址后只是等待吗?惊!原来网页显示背后隐藏着这些奇妙步骤(终章)
    车道线检测(二)——使用MNN部署PINet
    LeetCode347. 前 K 个高频元素
    Java项目:JSP酒店管理系统
    1087 有多少不同的值
    山东专升本计算机考试要求
    KubernetesNode节点配置
    cmake交叉编译时链接到x86库的问题
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722819