点击这里查看 Flink 1.13 源码解析 目录汇总
点击查看相关章节Flink 1.13 源码解析——启动脚本解析
点击查看相关章节Flink 1.13 源码解析前导——Akka通信模型
点击查看相关章节Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程概览
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互
目录
在之前的章节中我们分析了Flink主节点(JobManager)的启动流程,在接下来这几章里,我们来从源码入手分析一下Flink从节点的启动流程,TaskManager的启动流程中,有很多步骤和主节点的启动是相同的,他没有主节点中那么多的组件,但是启动的步骤要比主节点繁杂很多,在这一章我们首先来了解TaskManager的初始化流程。
在之前Flink启动脚本分析章节(点此查看 Flink 1.13 源码解析——启动脚本解析)中我们得知,standalone模式下Flink从节点的启动类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner,所以我们直接来看这个类的main方法:
- // --------------------------------------------------------------------------------------------
- // Static entry point
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
- // startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
- SignalHandler.register(LOG);
- JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
- long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
-
- if (maxOpenFileHandles != -1L) {
- LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
- } else {
- LOG.info("Cannot determine the maximum number of open file descriptors");
- }
-
- // TODO 启动
- runTaskManagerProcessSecurely(args);
- }
在main方法中前几行代码做了一些参数、配置校验的工作,我们直接来看runTaskManagerProcessSecurely方法:
- public static void runTaskManagerProcessSecurely(String[] args) {
- Configuration configuration = null;
-
- try {
- // TODO 解析args和flink-conf.yaml文件得到配置信息
- configuration = loadConfiguration(args);
- } catch (FlinkParseException fpe) {
- LOG.error("Could not load the configuration.", fpe);
- System.exit(FAILURE_EXIT_CODE);
- }
-
- // TODO 启动
- runTaskManagerProcessSecurely(checkNotNull(configuration));
- }
该方法依然是我们熟悉的从命令以及flink-conf.yaml文件解析配置,然后将解析后的配置传递给runTaskManagerProcessSecurely方法,我们点进来继续看:
- public static void runTaskManagerProcessSecurely(Configuration configuration) {
- FlinkSecurityManager.setFromConfiguration(configuration);
- // TODO 启动插件管理器
- final PluginManager pluginManager =
- PluginUtils.createPluginManagerFromRootFolder(configuration);
- FileSystem.initialize(configuration, pluginManager);
-
- int exitCode;
- Throwable throwable = null;
-
- try {
- SecurityUtils.install(new SecurityConfiguration(configuration));
-
- exitCode =
- SecurityUtils.getInstalledContext()
- // TODO 启动TaskManager
- .runSecured(() -> runTaskManager(configuration, pluginManager));
- } catch (Throwable t) {
- throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
- exitCode = FAILURE_EXIT_CODE;
- }
-
- if (throwable != null) {
- LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
- } else {
- LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
- }
-
- System.exit(exitCode);
- }
在该方法里,启动了一个插件管理器,并且执行了一个runTaskManager的方法,通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:
- public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
- throws Exception {
- final TaskManagerRunner taskManagerRunner;
-
- try {
- // TODO 构建一个TaskManagerRunner
- taskManagerRunner =
- new TaskManagerRunner(
- configuration,
- pluginManager,
- // TODO 真正创建TaskExecutor的地方
- TaskManagerRunner::createTaskExecutorService);
- // TODO 启动TaskManagerRunner
- taskManagerRunner.start();
- } catch (Exception exception) {
- throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
- }
-
- try {
- return taskManagerRunner.getTerminationFuture().get().getExitCode();
- } catch (Throwable t) {
- throw new FlinkException(
- "Unexpected failure during runtime of TaskManagerRunner.",
- ExceptionUtils.stripExecutionException(t));
- }
- }
在这个方法里做了两件事:
1、构建了一个TaskManagerRunner
2、启动TaskManagerRunner
实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。我们继续进来这个TaskManagerRunner的构造方法来看:
- public TaskManagerRunner(
- Configuration configuration,
- PluginManager pluginManager,
- TaskExecutorServiceFactory taskExecutorServiceFactory)
- throws Exception {
- this.configuration = checkNotNull(configuration);
-
- timeout = AkkaUtils.getTimeoutAsTime(configuration);
-
- // TODO TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
- // TODO 线程池大小为当前节点的cpu核心数
- this.executor =
- java.util.concurrent.Executors.newScheduledThreadPool(
- Hardware.getNumberCPUCores(),
- new ExecutorThreadFactory("taskmanager-future"));
-
- // TODO 高可用服务
- highAvailabilityServices =
- HighAvailabilityServicesUtils.createHighAvailabilityServices(
- configuration,
- executor,
- HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
- // TODO 1.12 新功能 JMX服务,提供监控信息
- JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
-
- // TODO 启动RPC服务,内部为Akka模型的ActorSystem
- rpcService = createRpcService(configuration, highAvailabilityServices);
-
- // TODO 为TaskManager生成了一个ResourceID
- this.resourceId =
- getTaskManagerResourceID(
- configuration, rpcService.getAddress(), rpcService.getPort());
-
- // TODO 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
- HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
-
- metricRegistry =
- new MetricRegistryImpl(
- MetricRegistryConfiguration.fromConfiguration(configuration),
- ReporterSetup.fromConfiguration(configuration, pluginManager));
-
- final RpcService metricQueryServiceRpcService =
- MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
- metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
-
- // TODO 在主节点启动的时候,事实上已经启动了有个BolbServer,
- // TODO 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
- blobCacheService =
- new BlobCacheService(
- configuration, highAvailabilityServices.createBlobStore(), null);
-
- final ExternalResourceInfoProvider externalResourceInfoProvider =
- ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
- configuration, pluginManager);
-
- // TODO 创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
- taskExecutorService =
- taskExecutorServiceFactory.createTaskExecutor(
- this.configuration,
- this.resourceId,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- metricRegistry,
- blobCacheService,
- false,
- externalResourceInfoProvider,
- this);
-
- this.terminationFuture = new CompletableFuture<>();
- this.shutdown = false;
- handleUnexpectedTaskExecutorServiceTermination();
-
- MemoryLogger.startIfConfigured(
- LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
- }
不难看出,这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:
1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。
2、构建了一个高可用服务。
3、初始化JMX服务,用于提供监控信息。
4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)
4、为TaskManager生成了一个ResourceID。
5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数
6、初始化metric服务
7、启动BlobCacheService服务,做文件缓存的服务。
8、构建了一个TaskExecutorService,内部封装了TaskExecutor。
在以上这些基础环境的初始化中,我们首先来看下BlobCacheService服务的初始化,点进BlobCacheService的构造方法:
- public BlobCacheService(
- final Configuration blobClientConfig,
- final BlobView blobView,
- @Nullable final InetSocketAddress serverAddress)
- throws IOException {
-
- /*
- TODO 初始化了两个文件服务:
- 1. 持久化Blob缓存服务
- 2. 临时Blob缓存服务
- 在这两个服务的内部都会在启动的时候启动一个定时服务
- 就是把过期的某个Job的对应资源都删除掉
- */
- this(
- // TODO 持久化
- new PermanentBlobCache(blobClientConfig, blobView, serverAddress),
- // TODO 缓存
- new TransientBlobCache(blobClientConfig, serverAddress));
- }
在这个构造方法里,主要做了两件事:
1、初始化了一个持久化Blob缓存服务
2、初始化了一个临时Blob缓存服务
在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。
我们以持久化Blob缓存服务为例,点进PermanentBlobCache对象的构造方法
- public PermanentBlobCache(
- final Configuration blobClientConfig,
- final BlobView blobView,
- @Nullable final InetSocketAddress serverAddress)
- throws IOException {
-
- super(
- blobClientConfig,
- blobView,
- LoggerFactory.getLogger(PermanentBlobCache.class),
- serverAddress);
-
- // Initializing the clean up task
- this.cleanupTimer = new Timer(true);
-
- // TODO 配置过期时间为1小时
- this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
- // TODO 启动定时任务,每1小时清理一次
- this.cleanupTimer.schedule(
- new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval);
- }
可以看到,在下面首先配置了一个过期时间,为1小时,接着启动了一个定时服务,每1小时执行一次PermanentBlobCleanupTask,我们继续来看PermanentBlobCleanupTask的run方法
- class PermanentBlobCleanupTask extends TimerTask {
- /** Cleans up BLOBs which are not referenced anymore. */
- @Override
- public void run() {
- // TODO 通过引用计数的方式获取所有Job引用的文件
- synchronized (jobRefCounters) {
- Iterator
> entryIter = - jobRefCounters.entrySet().iterator();
- final long currentTimeMillis = System.currentTimeMillis();
-
- // TODO 遍历所有文件
- while (entryIter.hasNext()) {
- Map.Entry
entry = entryIter.next(); - RefCount ref = entry.getValue();
-
- // TODO 判断是否过期
- if (ref.references <= 0
- && ref.keepUntil > 0
- && currentTimeMillis >= ref.keepUntil) {
- JobID jobId = entry.getKey();
-
- final File localFile =
- new File(
- BlobUtils.getStorageLocationPath(
- storageDir.getAbsolutePath(), jobId));
-
- /*
- * NOTE: normally it is not required to acquire the write lock to delete the job's
- * storage directory since there should be no one accessing it with the ref
- * counter being 0 - acquire it just in case, to always be on the safe side
- */
- readWriteLock.writeLock().lock();
-
- boolean success = false;
- try {
- // TODO 删除该资源文件夹
- FileUtils.deleteDirectory(localFile);
- success = true;
- } catch (Throwable t) {
- log.warn(
- "Failed to locally delete job directory "
- + localFile.getAbsolutePath(),
- t);
- } finally {
- readWriteLock.writeLock().unlock();
- }
-
- // let's only remove this directory from cleanup if the cleanup was
- // successful
- // (does not need the write lock)
- if (success) {
- entryIter.remove();
- }
- }
- }
- }
- }
- }
我们可以看到有以下操作:
1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。
2、遍历这些文件,并判断是否过期。
3、如果过期则删除该资源文件夹。
在临时缓存blob服务中也是一样的工作:
- public TransientBlobCache(
- final Configuration blobClientConfig, @Nullable final InetSocketAddress serverAddress)
- throws IOException {
-
- super(
- blobClientConfig,
- new VoidBlobStore(),
- LoggerFactory.getLogger(TransientBlobCache.class),
- serverAddress);
-
- // Initializing the clean up task
- this.cleanupTimer = new Timer(true);
-
- // TODO 1小时
- this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
- this.cleanupTimer.schedule(
- // TODO 定时服务
- new TransientBlobCleanupTask(
- blobExpiryTimes, readWriteLock.writeLock(), storageDir, log),
- cleanupInterval,
- cleanupInterval);
- }
首先获取超时时间为1小时,接着启动了一个定时服务,每1小时清理一次。
接下来到了重要环节,TaskExecutor的初始化
我们点进taskExecutorServiceFactory.createTaskExecutor方法里:
- public static TaskExecutorService createTaskExecutorService(
- Configuration configuration,
- ResourceID resourceID,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
- BlobCacheService blobCacheService,
- boolean localCommunicationOnly,
- ExternalResourceInfoProvider externalResourceInfoProvider,
- FatalErrorHandler fatalErrorHandler)
- throws Exception {
-
- // TODO 创建TaskExecutor
- final TaskExecutor taskExecutor =
- startTaskManager(
- configuration,
- resourceID,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- metricRegistry,
- blobCacheService,
- localCommunicationOnly,
- externalResourceInfoProvider,
- fatalErrorHandler);
-
- /*
- TODO 封装了一下TaskExecutor
- TaskExecutor是TaskExecutorToServiceAdapter的成员变量
- TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
- */
-
- return TaskExecutorToServiceAdapter.createFor(taskExecutor);
- }
可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:
在该方法内部依然是初始化了一些基础服务:
首先是初始化资源配置,获取硬件资源配置:
- // TODO 初始化资源配置,获取硬件资源配置
- final TaskExecutorResourceSpec taskExecutorResourceSpec =
- TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
接着获取配置:
- // TODO 获取配置(args和flink-conf)
- TaskManagerServicesConfiguration taskManagerServicesConfiguration =
- TaskManagerServicesConfiguration.fromConfiguration(
- configuration,
- resourceID,
- externalAddress,
- localCommunicationOnly,
- taskExecutorResourceSpec);
在这里TaskManagerService初始化了一些核心服务:
- // TODO 初始化了一些核心服务
- TaskManagerServices taskManagerServices =
- TaskManagerServices.fromConfiguration(
- taskManagerServicesConfiguration,
- blobCacheService.getPermanentBlobService(),
- taskManagerMetricGroup.f1,
- ioExecutor,
- fatalErrorHandler);
我们进入fromConfiguration方法:
- public static TaskManagerServices fromConfiguration(
- TaskManagerServicesConfiguration taskManagerServicesConfiguration,
- PermanentBlobService permanentBlobService,
- MetricGroup taskManagerMetricGroup,
- ExecutorService ioExecutor,
- FatalErrorHandler fatalErrorHandler)
- throws Exception {
-
- // pre-start checks
- checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
-
- // TODO 状态机 事件分发器
- final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
-
- // start the I/O manager, it will create some temp directories.
- final IOManager ioManager =
- new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
-
- // TODO 作业执行期间shuffle相关操作工作,后面讲作业执行时再细聊
- final ShuffleEnvironment, ?> shuffleEnvironment =
- createShuffleEnvironment(
- taskManagerServicesConfiguration,
- taskEventDispatcher,
- taskManagerMetricGroup,
- ioExecutor);
- final int listeningDataPort = shuffleEnvironment.start();
-
- // TODO state管理服务
- final KvStateService kvStateService =
- KvStateService.fromConfiguration(taskManagerServicesConfiguration);
- kvStateService.start();
-
- final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
- new UnresolvedTaskManagerLocation(
- taskManagerServicesConfiguration.getResourceID(),
- taskManagerServicesConfiguration.getExternalAddress(),
- // we expose the task manager location with the listening port
- // iff the external data port is not explicitly defined
- taskManagerServicesConfiguration.getExternalDataPort() > 0
- ? taskManagerServicesConfiguration.getExternalDataPort()
- : listeningDataPort);
-
- // TODO 广播变量管理服务
- final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
-
- // TODO TaskExecutor内部,最重要的一个成员变量
- // TODO 一张存放TaskSlot的表
- final TaskSlotTable
taskSlotTable = - createTaskSlotTable(
- taskManagerServicesConfiguration.getNumberOfSlots(),
- taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
- taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
- taskManagerServicesConfiguration.getPageSize(),
- ioExecutor);
-
- final JobTable jobTable = DefaultJobTable.create();
-
- // TODO 监控主节点Leader地址
- final JobLeaderService jobLeaderService =
- new DefaultJobLeaderService(
- unresolvedTaskManagerLocation,
- taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
-
- 。。。 。。。
-
- return new TaskManagerServices(
- unresolvedTaskManagerLocation,
- taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
- ioManager,
- shuffleEnvironment,
- kvStateService,
- broadcastVariableManager,
- taskSlotTable,
- jobTable,
- jobLeaderService,
- taskStateManager,
- taskEventDispatcher,
- ioExecutor,
- libraryCacheManager);
- }
在这里,初始化了事件分发起、IOManager、ShuffleEnvironment、state管理服务、广播变量历服务、TaskSlotJobManager的Leader地址监控服务等等,这里我们着重看一下TableSlot表,其他的核心服务我们会在后续Job的执行流程、Slot分配流程中详细描述,这里就先不聊了。
首先在TaskSlotTable,是TaskExecutor中非常非常重要的一个成员变量,它是真正帮助TaskExecutor完成一切和Slot有关操作的组件,在ResourceManager中,也有一个类似的组件,就是在注册的两个定时任务中的其中一个:slot定时任务SlotManager。(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
在JobMaster申请资源时,是ResourceManager中的SlotManager来完成资源分配的,在完成资源分配后,SlotManager会向TaskExecutor发送RPC请求,然后TaskExecutor再向ResourceManager去做汇报表示已完成分配。我们来看TaskSlotTable的实现类,其中有几个十分重要的变量:
- /** The list of all task slots. */
- // TODO 所有的slot
- // TODO 在TaskManager启动时会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot
- private final Map
> taskSlots; -
- /** Mapping from allocation id to task slot. */
- // TODO 所有已被分配的slot,维护着分配ID和TaskSlot之间的关系
- private final Map
> allocatedSlots;
其中taskSlots存放着所有的当前节点的slot,在当前节点的TaskManager启动时,会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot。
而allocatedSlots存放这所有已被分配的slot的信息,维护着分配ID和TaskSlot之间的关系。
我们继续回到TaskManagerRunner.startTaskManager方法,看最后一步,初始化TaskExecutor,我们点进TaskExecutor的构造方法,首先看到TaskExecutor继承自RPCEndpoint,那么我们就知道,当TaskExecutor初始化完成之后回去调用自身 的onStart方法(点击查看Flink 1.13 源码解析前导——Akka通信模型),此刻还在初始化之中,所以我们先继续往下看
- public TaskExecutor(
- RpcService rpcService,
- TaskManagerConfiguration taskManagerConfiguration,
- HighAvailabilityServices haServices,
- TaskManagerServices taskExecutorServices,
- ExternalResourceInfoProvider externalResourceInfoProvider,
- HeartbeatServices heartbeatServices,
- TaskManagerMetricGroup taskManagerMetricGroup,
- @Nullable String metricQueryServiceAddress,
- BlobCacheService blobCacheService,
- FatalErrorHandler fatalErrorHandler,
- TaskExecutorPartitionTracker partitionTracker) {
-
- // TaskExecutor为RPCEndpoint的子类,这个构造器调用的RPCEndpoint的构造器
- super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
-
- checkArgument(
- taskManagerConfiguration.getNumberSlots() > 0,
- "The number of slots has to be larger than 0.");
-
- this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
- this.taskExecutorServices = checkNotNull(taskExecutorServices);
- this.haServices = checkNotNull(haServices);
- this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
- this.partitionTracker = partitionTracker;
- this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
- this.blobCacheService = checkNotNull(blobCacheService);
- this.metricQueryServiceAddress = metricQueryServiceAddress;
- this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
-
- this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
- this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
- this.jobTable = taskExecutorServices.getJobTable();
- this.jobLeaderService = taskExecutorServices.getJobLeaderService();
- this.unresolvedTaskManagerLocation =
- taskExecutorServices.getUnresolvedTaskManagerLocation();
- this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
- this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
- this.kvStateService = taskExecutorServices.getKvStateService();
- this.ioExecutor = taskExecutorServices.getIOExecutor();
- this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
-
- this.hardwareDescription =
- HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
- this.memoryConfiguration =
- TaskExecutorMemoryConfiguration.create(taskManagerConfiguration.getConfiguration());
-
- this.resourceManagerAddress = null;
- this.resourceManagerConnection = null;
- this.currentRegistrationTimeoutId = null;
-
- final ResourceID resourceId =
- taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();
-
- // TODO 初始化了两个心跳管理器
- // TODO TaskExecutor维持和JobMaster的心跳
- this.jobManagerHeartbeatManager =
- createJobManagerHeartbeatManager(heartbeatServices, resourceId);
- // TODO TaskExecutor维持和ResourceManager的心跳
- this.resourceManagerHeartbeatManager =
- createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
-
- ExecutorThreadFactory sampleThreadFactory =
- new ExecutorThreadFactory.Builder()
- .setPoolName("flink-thread-info-sampler")
- .build();
- ScheduledExecutorService sampleExecutor =
- Executors.newSingleThreadScheduledExecutor(sampleThreadFactory);
- this.threadInfoSampleService = new ThreadInfoSampleService(sampleExecutor);
- }
在前半部分进行的一些变量的赋值,在下面初始化了两个心跳管理器,分别为:
1、TaskExecutor维持和JobMaster的心跳的管理器
2、TaskExecutor维持和ResourceManager心跳的管理器
在心跳管理器内部初始化了一个HeartbeatManagerImpl对象,还记得我们在ResourceManager中初始化的心跳管理器为HeartbeatManagerSenderImpl,根据名字能看出这是一个心跳请求发送器,也是在ResourceManager那一章节中我们讲到,在HeartbeatManagerSenderImpl中会有一个定时任务,每10秒钟遍历一次所有的已注册的心跳目标对象,并向每个对象发送心跳请求(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
- public HeartbeatManager createHeartbeatManager(
- ResourceID resourceId,
- HeartbeatListener heartbeatListener,
- ScheduledExecutor mainThreadExecutor,
- Logger log) {
-
- /*
- TODO
- 主节点中的心跳管理器为HeartbeatManagerSenderImpl 心跳请求发送器 client
- 在HeartbeatManagerSenderImpl内部构建了一个定时服务
- 每10秒 向所有的心跳目标对象,发送心跳请求
- 从节点(当前)为HeartbeatManagerImpl 心跳请求处理器 Server
- */
- return new HeartbeatManagerImpl<>(
- heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
- }
到此为止,我们的TaskExecutor的正式初始化完成。
我们在这里总结一下TaskExecutor的初始化流程:
1、首先构建了一个TaskManagerRunner,用于完成TaskManager启动的准备工作,再完成准备工作后,通过调用TaskManagerRunner的start方法来启动。
2、在TaskManagerRunner内部初始化了一个TaskManagerService对象,用来初始化TaskExecutor所需要的基础服务。
3、在TaskManagerService内部,首先会初始化一些基础服务,如TaskEvent Dispatcher、IO管理器、shuffleEnvironment、state管理器、TaskSlotTable等等。
4、在完成基础服务的初始化之后,开始初始化TaskExecutor,首先初始化了两个心跳管理期,分别来维护和JobMaster、ResourceManager的心跳。因为TaskExecutor继承了RpcEndpoint,所以具有生命周期方法onStart。
5、TaskExecutor初始化完成。
在下一章里我们来看已经初始化完成的TaskExecutor的启动流程。