• Flink1.15源码解析--启动TaskManager


    一、前言

    TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流

    必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

    二、TaskManagerRunner

    taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

    入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely

        public static void main(String[] args) throws Exception {
            // startup checks and logging
            EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
            SignalHandler.register(LOG);
            JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
            long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
    
            if (maxOpenFileHandles != -1L) {
                LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
            } else {
                LOG.info("Cannot determine the maximum number of open file descriptors");
            }
    
            runTaskManagerProcessSecurely(args);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.

        public static void runTaskManagerProcessSecurely(String[] args) {
            Configuration configuration = null;
    
            try {
                configuration = loadConfiguration(args);
            } catch (FlinkParseException fpe) {
                LOG.error("Could not load the configuration.", fpe);
                System.exit(FAILURE_EXIT_CODE);
            }
    		
            runTaskManagerProcessSecurely(checkNotNull(configuration));
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    然后将解析完的配置传入 runTaskManagerProcessSecurely

        public static void runTaskManagerProcessSecurely(Configuration configuration) {
            // 加载配置
            FlinkSecurityManager.setFromConfiguration(configuration);
    
            // 启动插件管理器
            final PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            FileSystem.initialize(configuration, pluginManager);
    
            StateChangelogStorageLoader.initialize(pluginManager);
    
            int exitCode;
            Throwable throwable = null;
    
            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            try {
                // 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils
                // 安装 安全模块
                SecurityUtils.install(new SecurityConfiguration(configuration));
    
                // 通过安全上下文 启动 taskManager
                exitCode =
                        SecurityUtils.getInstalledContext()
                                .runSecured(() -> runTaskManager(configuration, pluginManager));
            } catch (Throwable t) {
                throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
                exitCode = FAILURE_EXIT_CODE;
            }
    
            if (throwable != null) {
                LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
            } else {
                LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
            }
    
            System.exit(exitCode);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:

    
        public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
                throws Exception {
            final TaskManagerRunner taskManagerRunner;
            try {
                // 创建 TaskManagerRunner
                taskManagerRunner =
                        new TaskManagerRunner(
                                configuration,
                                pluginManager,
                                TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
                // 启动 TaskManagerRunner
                taskManagerRunner.start();
            } catch (Exception exception) {
                throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
            }
    
            // 返回结果码
            try {
                return taskManagerRunner.getTerminationFuture().get().getExitCode();
            } catch (Throwable t) {
                throw new FlinkException(
                        "Unexpected failure during runtime of TaskManagerRunner.",
                        ExceptionUtils.stripExecutionException(t));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这个方法里做了两件事:

    • 1、构建了一个TaskManagerRunner

    • 2、启动TaskManagerRunner

      • 基础服务初始化

    实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。

    2.1、创建 TaskManagerRunner

                // 创建 TaskManagerRunner
                taskManagerRunner =
                        new TaskManagerRunner(
                                configuration,
                                pluginManager,
                                TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor

    TaskManagerRunner::createTaskExecutorService
    
    
       public static TaskExecutorService createTaskExecutorService(
                Configuration configuration,
                ResourceID resourceID,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
                BlobCacheService blobCacheService,
                boolean localCommunicationOnly,
                ExternalResourceInfoProvider externalResourceInfoProvider,
                WorkingDirectory workingDirectory,
                FatalErrorHandler fatalErrorHandler)
                throws Exception {
    		// 构建  TaskExecutor
            final TaskExecutor taskExecutor =
                    startTaskManager(
                            configuration,
                            resourceID,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            metricRegistry,
                            blobCacheService,
                            localCommunicationOnly,
                            externalResourceInfoProvider,
                            workingDirectory,
                            fatalErrorHandler);
    
            return TaskExecutorToServiceAdapter.createFor(taskExecutor);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2.2、启动 TaskManagerRunner

        public void start() throws Exception {
            synchronized (lock) {
            	// 
                startTaskManagerRunnerServices();
                taskExecutorService.start();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2.1、基础服务的初始化, 构建 TaskExecutorService

     private void startTaskManagerRunnerServices() throws Exception {
            synchronized (lock) {
                rpcSystem = RpcSystem.load(configuration);
    
                //  TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
                this.executor =
                        Executors.newScheduledThreadPool(
                                Hardware.getNumberCPUCores(), // 线程池大小为当前节点的cpu核心数
                                new ExecutorThreadFactory("taskmanager-future"));
    
                // 高可用服务
                highAvailabilityServices =
                        HighAvailabilityServicesUtils.createHighAvailabilityServices(
                                configuration,
                                executor,
                                AddressResolution.NO_ADDRESS_RESOLUTION,
                                rpcSystem,
                                this);
    
                // flink1.12 引入新功能 JMX服务,提供监控信息
                JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
    
                // 启动RPC服务,内部为Akka模型的ActorSystem
                rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);
    
                // 为TaskManager生成了一个ResourceID
                this.resourceId =
                        getTaskManagerResourceID(
                                configuration, rpcService.getAddress(), rpcService.getPort());
    
                this.workingDirectory =
                        ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
                                configuration, resourceId);
    
                LOG.info("Using working directory: {}", workingDirectory);
    
                // 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
                HeartbeatServices heartbeatServices =
                        HeartbeatServices.fromConfiguration(configuration);
    
                // 启动Metric(性能监控) 相关服务
                metricRegistry =
                        new MetricRegistryImpl(
                                MetricRegistryConfiguration.fromConfiguration(
                                        configuration,
                                        rpcSystem.getMaximumMessageSizeInBytes(configuration)),
                                ReporterSetup.fromConfiguration(configuration, pluginManager));
    
                final RpcService metricQueryServiceRpcService =
                        MetricUtils.startRemoteMetricsRpcService(
                                configuration,
                                rpcService.getAddress(),
                                configuration.getString(TaskManagerOptions.BIND_HOST),
                                rpcSystem);
                metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());
    
                // 在主节点启动的时候,事实上已经启动了有个BolbServer,
                // 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
                blobCacheService =
                        BlobUtils.createBlobCacheService(
                                configuration,
                                Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                                highAvailabilityServices.createBlobStore(),
                                null);
    
                final ExternalResourceInfoProvider externalResourceInfoProvider =
                        ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
                                configuration, pluginManager);
    
                //  创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
                taskExecutorService =
                        taskExecutorServiceFactory.createTaskExecutor(
                                this.configuration,
                                this.resourceId.unwrap(),
                                rpcService,
                                highAvailabilityServices,
                                heartbeatServices,
                                metricRegistry,
                                blobCacheService,
                                false,
                                externalResourceInfoProvider,
                                workingDirectory.unwrap(),
                                this);
    
                handleUnexpectedTaskExecutorServiceTermination();
    
                MemoryLogger.startIfConfigured(
                        LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:

    • 1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。

    • 2、构建了一个高可用服务。

    • 3、初始化JMX服务,用于提供监控信息。

    • 4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)

    • 4、为TaskManager生成了一个ResourceID。

    • 5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数

    • 6、初始化metric服务

    • 7、启动BlobCacheService服务,做文件缓存的服务。

    • 8、构建了一个TaskExecutorService,内部封装了TaskExecutor。

    2.2.1.1、BlobCacheService的初始化

    在这个构造方法里,主要做了两件事:

    • 1、初始化了一个持久化Blob缓存服务

    • 2、初始化了一个临时Blob缓存服务

    在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。

        public BlobCacheService(
                final Configuration blobClientConfig,
                final Reference<File> storageDir,
                final BlobView blobView,
                @Nullable final InetSocketAddress serverAddress)
                throws IOException {
    
            this(
                    // 持久化Blob缓存服务
                    new PermanentBlobCache(blobClientConfig, storageDir, blobView, serverAddress),
                    // 临时Blob缓存服务
                    new TransientBlobCache(blobClientConfig, storageDir, serverAddress));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    以 PermanentBlobCache 为例

        @VisibleForTesting
        public PermanentBlobCache(
                final Configuration blobClientConfig,
                final Reference<File> storageDir,
                final BlobView blobView,
                @Nullable final InetSocketAddress serverAddress,
                BlobCacheSizeTracker blobCacheSizeTracker)
                throws IOException {
            super(
                    blobClientConfig,
                    storageDir,
                    blobView,
                    LoggerFactory.getLogger(PermanentBlobCache.class),
                    serverAddress);
    
            // Initializing the clean up task
            this.cleanupTimer = new Timer(true);
    
            this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
            // TODO 启动定时任务
            this.cleanupTimer.schedule(
                    new PermanentBlobCleanupTask(), // 任务
                    cleanupInterval, cleanupInterval);
            // TODO 为永久BLOB文件提供缓存,包括每个作业的引用计数和分阶段清理。
            this.blobCacheSizeTracker = blobCacheSizeTracker;
    
            registerDetectedJobs();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    我们可以看到有以下操作:

    • 1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。

    • 2、遍历这些文件,并判断是否过期。

    • 3、如果过期则删除该资源文件夹。

    在临时缓存blob服务中也是一样的工作

    接下来到了重要环节

    2.3、TaskExecutor的初始化

    // org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorService
    
        public static TaskExecutorService createTaskExecutorService(
                Configuration configuration,
                ResourceID resourceID,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
                BlobCacheService blobCacheService,
                boolean localCommunicationOnly,
                ExternalResourceInfoProvider externalResourceInfoProvider,
                WorkingDirectory workingDirectory,
                FatalErrorHandler fatalErrorHandler)
                throws Exception {
    
            final TaskExecutor taskExecutor =
                    startTaskManager(
                            configuration,
                            resourceID,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            metricRegistry,
                            blobCacheService,
                            localCommunicationOnly,
                            externalResourceInfoProvider,
                            workingDirectory,
                            fatalErrorHandler);
            /*
             TODO 封装了一下TaskExecutor
              TaskExecutor是TaskExecutorToServiceAdapter的成员变量
              TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
             */
            return TaskExecutorToServiceAdapter.createFor(taskExecutor);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:

    在该方法内部依然是初始化了一些基础服务:

    返回Flink1.15源码解析-总目录

  • 相关阅读:
    Tomcat动静分离
    DL-31/6电流继电器
    CleanMyMac X4.10.7版本更新
    【数论】莫比乌斯反演
    在macOS中搭建.NET MAUI开发环境
    腾讯不被看好?Prosus宣布减持股份,中国科技公司路在何方?
    【GlobalMapper精品教程】030:栅格重采样案例教程(航测DSM)
    【数智化人物展】白鲸开源CEO郭炜:大模型助力企业大数据治理“数智化”升级...
    2309C++nlohmann数格示例2
    居民健康监测小程序|基于微信小程序的居民健康监测小程序设计与实现(源码+数据库+文档)
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127910339