• Flink 1.13 源码解析——TaskManager启动流程 之 初始化TaskExecutor



    ​​​点击这里查看 Flink 1.13 源码解析 目录汇总

    点击查看相关章节Flink 1.13 源码解析——启动脚本解析

    点击查看相关章节Flink 1.13 源码解析前导——Akka通信模型

    点击查看相关章节Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动

    点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程概览

    点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互

    目录

    一、前言

    二、TaskExecutor的构建

    2.1、TaskManager基础服务的初始化

    2.1.1、BlobCacheService的初始化

    2.2、TaskExecutor的构造过程

    2.2.3、TaskSlotTable详解

    2.2.3、TaskExecutor的初始化

    总结:


    一、前言

            在之前的章节中我们分析了Flink主节点(JobManager)的启动流程,在接下来这几章里,我们来从源码入手分析一下Flink从节点的启动流程,TaskManager的启动流程中,有很多步骤和主节点的启动是相同的,他没有主节点中那么多的组件,但是启动的步骤要比主节点繁杂很多,在这一章我们首先来了解TaskManager的初始化流程。

    二、TaskExecutor的构建

            在之前Flink启动脚本分析章节(点此查看 Flink 1.13 源码解析——启动脚本解析)中我们得知,standalone模式下Flink从节点的启动类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner,所以我们直接来看这个类的main方法:

    1. // --------------------------------------------------------------------------------------------
    2. // Static entry point
    3. // --------------------------------------------------------------------------------------------
    4. public static void main(String[] args) throws Exception {
    5. // startup checks and logging
    6. EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
    7. SignalHandler.register(LOG);
    8. JvmShutdownSafeguard.installAsShutdownHook(LOG);
    9. long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
    10. if (maxOpenFileHandles != -1L) {
    11. LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
    12. } else {
    13. LOG.info("Cannot determine the maximum number of open file descriptors");
    14. }
    15. // TODO 启动
    16. runTaskManagerProcessSecurely(args);
    17. }

    在main方法中前几行代码做了一些参数、配置校验的工作,我们直接来看runTaskManagerProcessSecurely方法:

    1. public static void runTaskManagerProcessSecurely(String[] args) {
    2. Configuration configuration = null;
    3. try {
    4. // TODO 解析args和flink-conf.yaml文件得到配置信息
    5. configuration = loadConfiguration(args);
    6. } catch (FlinkParseException fpe) {
    7. LOG.error("Could not load the configuration.", fpe);
    8. System.exit(FAILURE_EXIT_CODE);
    9. }
    10. // TODO 启动
    11. runTaskManagerProcessSecurely(checkNotNull(configuration));
    12. }

    该方法依然是我们熟悉的从命令以及flink-conf.yaml文件解析配置,然后将解析后的配置传递给runTaskManagerProcessSecurely方法,我们点进来继续看:

    1. public static void runTaskManagerProcessSecurely(Configuration configuration) {
    2. FlinkSecurityManager.setFromConfiguration(configuration);
    3. // TODO 启动插件管理器
    4. final PluginManager pluginManager =
    5. PluginUtils.createPluginManagerFromRootFolder(configuration);
    6. FileSystem.initialize(configuration, pluginManager);
    7. int exitCode;
    8. Throwable throwable = null;
    9. try {
    10. SecurityUtils.install(new SecurityConfiguration(configuration));
    11. exitCode =
    12. SecurityUtils.getInstalledContext()
    13. // TODO 启动TaskManager
    14. .runSecured(() -> runTaskManager(configuration, pluginManager));
    15. } catch (Throwable t) {
    16. throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    17. exitCode = FAILURE_EXIT_CODE;
    18. }
    19. if (throwable != null) {
    20. LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
    21. } else {
    22. LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
    23. }
    24. System.exit(exitCode);
    25. }

    在该方法里,启动了一个插件管理器,并且执行了一个runTaskManager的方法,通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:

    1. public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
    2. throws Exception {
    3. final TaskManagerRunner taskManagerRunner;
    4. try {
    5. // TODO 构建一个TaskManagerRunner
    6. taskManagerRunner =
    7. new TaskManagerRunner(
    8. configuration,
    9. pluginManager,
    10. // TODO 真正创建TaskExecutor的地方
    11. TaskManagerRunner::createTaskExecutorService);
    12. // TODO 启动TaskManagerRunner
    13. taskManagerRunner.start();
    14. } catch (Exception exception) {
    15. throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
    16. }
    17. try {
    18. return taskManagerRunner.getTerminationFuture().get().getExitCode();
    19. } catch (Throwable t) {
    20. throw new FlinkException(
    21. "Unexpected failure during runtime of TaskManagerRunner.",
    22. ExceptionUtils.stripExecutionException(t));
    23. }
    24. }

    在这个方法里做了两件事:

    1、构建了一个TaskManagerRunner

    2、启动TaskManagerRunner

    实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。我们继续进来这个TaskManagerRunner的构造方法来看:

    2.1、TaskManager基础服务的初始化

    1. public TaskManagerRunner(
    2. Configuration configuration,
    3. PluginManager pluginManager,
    4. TaskExecutorServiceFactory taskExecutorServiceFactory)
    5. throws Exception {
    6. this.configuration = checkNotNull(configuration);
    7. timeout = AkkaUtils.getTimeoutAsTime(configuration);
    8. // TODO TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
    9. // TODO 线程池大小为当前节点的cpu核心数
    10. this.executor =
    11. java.util.concurrent.Executors.newScheduledThreadPool(
    12. Hardware.getNumberCPUCores(),
    13. new ExecutorThreadFactory("taskmanager-future"));
    14. // TODO 高可用服务
    15. highAvailabilityServices =
    16. HighAvailabilityServicesUtils.createHighAvailabilityServices(
    17. configuration,
    18. executor,
    19. HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    20. // TODO 1.12 新功能 JMX服务,提供监控信息
    21. JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
    22. // TODO 启动RPC服务,内部为Akka模型的ActorSystem
    23. rpcService = createRpcService(configuration, highAvailabilityServices);
    24. // TODO 为TaskManager生成了一个ResourceID
    25. this.resourceId =
    26. getTaskManagerResourceID(
    27. configuration, rpcService.getAddress(), rpcService.getPort());
    28. // TODO 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
    29. HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
    30. metricRegistry =
    31. new MetricRegistryImpl(
    32. MetricRegistryConfiguration.fromConfiguration(configuration),
    33. ReporterSetup.fromConfiguration(configuration, pluginManager));
    34. final RpcService metricQueryServiceRpcService =
    35. MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
    36. metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
    37. // TODO 在主节点启动的时候,事实上已经启动了有个BolbServer,
    38. // TODO 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
    39. blobCacheService =
    40. new BlobCacheService(
    41. configuration, highAvailabilityServices.createBlobStore(), null);
    42. final ExternalResourceInfoProvider externalResourceInfoProvider =
    43. ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
    44. configuration, pluginManager);
    45. // TODO 创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
    46. taskExecutorService =
    47. taskExecutorServiceFactory.createTaskExecutor(
    48. this.configuration,
    49. this.resourceId,
    50. rpcService,
    51. highAvailabilityServices,
    52. heartbeatServices,
    53. metricRegistry,
    54. blobCacheService,
    55. false,
    56. externalResourceInfoProvider,
    57. this);
    58. this.terminationFuture = new CompletableFuture<>();
    59. this.shutdown = false;
    60. handleUnexpectedTaskExecutorServiceTermination();
    61. MemoryLogger.startIfConfigured(
    62. LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
    63. }

    不难看出,这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:

    1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。

    2、构建了一个高可用服务。

    3、初始化JMX服务,用于提供监控信息。

    4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型

    4、为TaskManager生成了一个ResourceID。

    5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数

    6、初始化metric服务

    7、启动BlobCacheService服务,做文件缓存的服务。

    8、构建了一个TaskExecutorService,内部封装了TaskExecutor。

    2.1.1、BlobCacheService的初始化

    在以上这些基础环境的初始化中,我们首先来看下BlobCacheService服务的初始化,点进BlobCacheService的构造方法:

    1. public BlobCacheService(
    2. final Configuration blobClientConfig,
    3. final BlobView blobView,
    4. @Nullable final InetSocketAddress serverAddress)
    5. throws IOException {
    6. /*
    7. TODO 初始化了两个文件服务:
    8. 1. 持久化Blob缓存服务
    9. 2. 临时Blob缓存服务
    10. 在这两个服务的内部都会在启动的时候启动一个定时服务
    11. 就是把过期的某个Job的对应资源都删除掉
    12. */
    13. this(
    14. // TODO 持久化
    15. new PermanentBlobCache(blobClientConfig, blobView, serverAddress),
    16. // TODO 缓存
    17. new TransientBlobCache(blobClientConfig, serverAddress));
    18. }

    在这个构造方法里,主要做了两件事:

    1、初始化了一个持久化Blob缓存服务

    2、初始化了一个临时Blob缓存服务

    在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。

    我们以持久化Blob缓存服务为例,点进PermanentBlobCache对象的构造方法

    1. public PermanentBlobCache(
    2. final Configuration blobClientConfig,
    3. final BlobView blobView,
    4. @Nullable final InetSocketAddress serverAddress)
    5. throws IOException {
    6. super(
    7. blobClientConfig,
    8. blobView,
    9. LoggerFactory.getLogger(PermanentBlobCache.class),
    10. serverAddress);
    11. // Initializing the clean up task
    12. this.cleanupTimer = new Timer(true);
    13. // TODO 配置过期时间为1小时
    14. this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    15. // TODO 启动定时任务,每1小时清理一次
    16. this.cleanupTimer.schedule(
    17. new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval);
    18. }

    可以看到,在下面首先配置了一个过期时间,为1小时,接着启动了一个定时服务,每1小时执行一次PermanentBlobCleanupTask,我们继续来看PermanentBlobCleanupTask的run方法

    1. class PermanentBlobCleanupTask extends TimerTask {
    2. /** Cleans up BLOBs which are not referenced anymore. */
    3. @Override
    4. public void run() {
    5. // TODO 通过引用计数的方式获取所有Job引用的文件
    6. synchronized (jobRefCounters) {
    7. Iterator> entryIter =
    8. jobRefCounters.entrySet().iterator();
    9. final long currentTimeMillis = System.currentTimeMillis();
    10. // TODO 遍历所有文件
    11. while (entryIter.hasNext()) {
    12. Map.Entry entry = entryIter.next();
    13. RefCount ref = entry.getValue();
    14. // TODO 判断是否过期
    15. if (ref.references <= 0
    16. && ref.keepUntil > 0
    17. && currentTimeMillis >= ref.keepUntil) {
    18. JobID jobId = entry.getKey();
    19. final File localFile =
    20. new File(
    21. BlobUtils.getStorageLocationPath(
    22. storageDir.getAbsolutePath(), jobId));
    23. /*
    24. * NOTE: normally it is not required to acquire the write lock to delete the job's
    25. * storage directory since there should be no one accessing it with the ref
    26. * counter being 0 - acquire it just in case, to always be on the safe side
    27. */
    28. readWriteLock.writeLock().lock();
    29. boolean success = false;
    30. try {
    31. // TODO 删除该资源文件夹
    32. FileUtils.deleteDirectory(localFile);
    33. success = true;
    34. } catch (Throwable t) {
    35. log.warn(
    36. "Failed to locally delete job directory "
    37. + localFile.getAbsolutePath(),
    38. t);
    39. } finally {
    40. readWriteLock.writeLock().unlock();
    41. }
    42. // let's only remove this directory from cleanup if the cleanup was
    43. // successful
    44. // (does not need the write lock)
    45. if (success) {
    46. entryIter.remove();
    47. }
    48. }
    49. }
    50. }
    51. }
    52. }

    我们可以看到有以下操作:

    1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。

    2、遍历这些文件,并判断是否过期。

    3、如果过期则删除该资源文件夹。

    在临时缓存blob服务中也是一样的工作:

    1. public TransientBlobCache(
    2. final Configuration blobClientConfig, @Nullable final InetSocketAddress serverAddress)
    3. throws IOException {
    4. super(
    5. blobClientConfig,
    6. new VoidBlobStore(),
    7. LoggerFactory.getLogger(TransientBlobCache.class),
    8. serverAddress);
    9. // Initializing the clean up task
    10. this.cleanupTimer = new Timer(true);
    11. // TODO 1小时
    12. this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
    13. this.cleanupTimer.schedule(
    14. // TODO 定时服务
    15. new TransientBlobCleanupTask(
    16. blobExpiryTimes, readWriteLock.writeLock(), storageDir, log),
    17. cleanupInterval,
    18. cleanupInterval);
    19. }

    首先获取超时时间为1小时,接着启动了一个定时服务,每1小时清理一次。

    接下来到了重要环节,TaskExecutor的初始化

    2.2、TaskExecutor的构造过程

    我们点进taskExecutorServiceFactory.createTaskExecutor方法里:

    1. public static TaskExecutorService createTaskExecutorService(
    2. Configuration configuration,
    3. ResourceID resourceID,
    4. RpcService rpcService,
    5. HighAvailabilityServices highAvailabilityServices,
    6. HeartbeatServices heartbeatServices,
    7. MetricRegistry metricRegistry,
    8. BlobCacheService blobCacheService,
    9. boolean localCommunicationOnly,
    10. ExternalResourceInfoProvider externalResourceInfoProvider,
    11. FatalErrorHandler fatalErrorHandler)
    12. throws Exception {
    13. // TODO 创建TaskExecutor
    14. final TaskExecutor taskExecutor =
    15. startTaskManager(
    16. configuration,
    17. resourceID,
    18. rpcService,
    19. highAvailabilityServices,
    20. heartbeatServices,
    21. metricRegistry,
    22. blobCacheService,
    23. localCommunicationOnly,
    24. externalResourceInfoProvider,
    25. fatalErrorHandler);
    26. /*
    27. TODO 封装了一下TaskExecutor
    28. TaskExecutor是TaskExecutorToServiceAdapter的成员变量
    29. TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
    30. */
    31. return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    32. }

    可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:

    在该方法内部依然是初始化了一些基础服务:

    首先是初始化资源配置,获取硬件资源配置:

    1. // TODO 初始化资源配置,获取硬件资源配置
    2. final TaskExecutorResourceSpec taskExecutorResourceSpec =
    3. TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

    接着获取配置:

    1. // TODO 获取配置(args和flink-conf)
    2. TaskManagerServicesConfiguration taskManagerServicesConfiguration =
    3. TaskManagerServicesConfiguration.fromConfiguration(
    4. configuration,
    5. resourceID,
    6. externalAddress,
    7. localCommunicationOnly,
    8. taskExecutorResourceSpec);

    在这里TaskManagerService初始化了一些核心服务:

    1. // TODO 初始化了一些核心服务
    2. TaskManagerServices taskManagerServices =
    3. TaskManagerServices.fromConfiguration(
    4. taskManagerServicesConfiguration,
    5. blobCacheService.getPermanentBlobService(),
    6. taskManagerMetricGroup.f1,
    7. ioExecutor,
    8. fatalErrorHandler);

    我们进入fromConfiguration方法:

    1. public static TaskManagerServices fromConfiguration(
    2. TaskManagerServicesConfiguration taskManagerServicesConfiguration,
    3. PermanentBlobService permanentBlobService,
    4. MetricGroup taskManagerMetricGroup,
    5. ExecutorService ioExecutor,
    6. FatalErrorHandler fatalErrorHandler)
    7. throws Exception {
    8. // pre-start checks
    9. checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
    10. // TODO 状态机 事件分发器
    11. final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
    12. // start the I/O manager, it will create some temp directories.
    13. final IOManager ioManager =
    14. new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
    15. // TODO 作业执行期间shuffle相关操作工作,后面讲作业执行时再细聊
    16. final ShuffleEnvironment shuffleEnvironment =
    17. createShuffleEnvironment(
    18. taskManagerServicesConfiguration,
    19. taskEventDispatcher,
    20. taskManagerMetricGroup,
    21. ioExecutor);
    22. final int listeningDataPort = shuffleEnvironment.start();
    23. // TODO state管理服务
    24. final KvStateService kvStateService =
    25. KvStateService.fromConfiguration(taskManagerServicesConfiguration);
    26. kvStateService.start();
    27. final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
    28. new UnresolvedTaskManagerLocation(
    29. taskManagerServicesConfiguration.getResourceID(),
    30. taskManagerServicesConfiguration.getExternalAddress(),
    31. // we expose the task manager location with the listening port
    32. // iff the external data port is not explicitly defined
    33. taskManagerServicesConfiguration.getExternalDataPort() > 0
    34. ? taskManagerServicesConfiguration.getExternalDataPort()
    35. : listeningDataPort);
    36. // TODO 广播变量管理服务
    37. final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
    38. // TODO TaskExecutor内部,最重要的一个成员变量
    39. // TODO 一张存放TaskSlot的表
    40. final TaskSlotTable taskSlotTable =
    41. createTaskSlotTable(
    42. taskManagerServicesConfiguration.getNumberOfSlots(),
    43. taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
    44. taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
    45. taskManagerServicesConfiguration.getPageSize(),
    46. ioExecutor);
    47. final JobTable jobTable = DefaultJobTable.create();
    48. // TODO 监控主节点Leader地址
    49. final JobLeaderService jobLeaderService =
    50. new DefaultJobLeaderService(
    51. unresolvedTaskManagerLocation,
    52. taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
    53. 。。。 。。。
    54. return new TaskManagerServices(
    55. unresolvedTaskManagerLocation,
    56. taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
    57. ioManager,
    58. shuffleEnvironment,
    59. kvStateService,
    60. broadcastVariableManager,
    61. taskSlotTable,
    62. jobTable,
    63. jobLeaderService,
    64. taskStateManager,
    65. taskEventDispatcher,
    66. ioExecutor,
    67. libraryCacheManager);
    68. }

    在这里,初始化了事件分发起、IOManager、ShuffleEnvironment、state管理服务、广播变量历服务、TaskSlotJobManager的Leader地址监控服务等等,这里我们着重看一下TableSlot表,其他的核心服务我们会在后续Job的执行流程、Slot分配流程中详细描述,这里就先不聊了。

    2.2.3、TaskSlotTable详解

    首先在TaskSlotTable,是TaskExecutor中非常非常重要的一个成员变量,它是真正帮助TaskExecutor完成一切和Slot有关操作的组件,在ResourceManager中,也有一个类似的组件,就是在注册的两个定时任务中的其中一个:slot定时任务SlotManager。(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动

    在JobMaster申请资源时,是ResourceManager中的SlotManager来完成资源分配的,在完成资源分配后,SlotManager会向TaskExecutor发送RPC请求,然后TaskExecutor再向ResourceManager去做汇报表示已完成分配。我们来看TaskSlotTable的实现类,其中有几个十分重要的变量:

    1. /** The list of all task slots. */
    2. // TODO 所有的slot
    3. // TODO 在TaskManager启动时会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot
    4. private final Map> taskSlots;
    5. /** Mapping from allocation id to task slot. */
    6. // TODO 所有已被分配的slot,维护着分配ID和TaskSlot之间的关系
    7. private final Map> allocatedSlots;

    其中taskSlots存放着所有的当前节点的slot,在当前节点的TaskManager启动时,会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot。

    而allocatedSlots存放这所有已被分配的slot的信息,维护着分配ID和TaskSlot之间的关系。

    2.2.3、TaskExecutor的初始化

    我们继续回到TaskManagerRunner.startTaskManager方法,看最后一步,初始化TaskExecutor,我们点进TaskExecutor的构造方法,首先看到TaskExecutor继承自RPCEndpoint,那么我们就知道,当TaskExecutor初始化完成之后回去调用自身 的onStart方法(点击查看Flink 1.13 源码解析前导——Akka通信模型),此刻还在初始化之中,所以我们先继续往下看

    1. public TaskExecutor(
    2. RpcService rpcService,
    3. TaskManagerConfiguration taskManagerConfiguration,
    4. HighAvailabilityServices haServices,
    5. TaskManagerServices taskExecutorServices,
    6. ExternalResourceInfoProvider externalResourceInfoProvider,
    7. HeartbeatServices heartbeatServices,
    8. TaskManagerMetricGroup taskManagerMetricGroup,
    9. @Nullable String metricQueryServiceAddress,
    10. BlobCacheService blobCacheService,
    11. FatalErrorHandler fatalErrorHandler,
    12. TaskExecutorPartitionTracker partitionTracker) {
    13. // TaskExecutor为RPCEndpoint的子类,这个构造器调用的RPCEndpoint的构造器
    14. super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
    15. checkArgument(
    16. taskManagerConfiguration.getNumberSlots() > 0,
    17. "The number of slots has to be larger than 0.");
    18. this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
    19. this.taskExecutorServices = checkNotNull(taskExecutorServices);
    20. this.haServices = checkNotNull(haServices);
    21. this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
    22. this.partitionTracker = partitionTracker;
    23. this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
    24. this.blobCacheService = checkNotNull(blobCacheService);
    25. this.metricQueryServiceAddress = metricQueryServiceAddress;
    26. this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
    27. this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
    28. this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
    29. this.jobTable = taskExecutorServices.getJobTable();
    30. this.jobLeaderService = taskExecutorServices.getJobLeaderService();
    31. this.unresolvedTaskManagerLocation =
    32. taskExecutorServices.getUnresolvedTaskManagerLocation();
    33. this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
    34. this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
    35. this.kvStateService = taskExecutorServices.getKvStateService();
    36. this.ioExecutor = taskExecutorServices.getIOExecutor();
    37. this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
    38. this.hardwareDescription =
    39. HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
    40. this.memoryConfiguration =
    41. TaskExecutorMemoryConfiguration.create(taskManagerConfiguration.getConfiguration());
    42. this.resourceManagerAddress = null;
    43. this.resourceManagerConnection = null;
    44. this.currentRegistrationTimeoutId = null;
    45. final ResourceID resourceId =
    46. taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();
    47. // TODO 初始化了两个心跳管理器
    48. // TODO TaskExecutor维持和JobMaster的心跳
    49. this.jobManagerHeartbeatManager =
    50. createJobManagerHeartbeatManager(heartbeatServices, resourceId);
    51. // TODO TaskExecutor维持和ResourceManager的心跳
    52. this.resourceManagerHeartbeatManager =
    53. createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
    54. ExecutorThreadFactory sampleThreadFactory =
    55. new ExecutorThreadFactory.Builder()
    56. .setPoolName("flink-thread-info-sampler")
    57. .build();
    58. ScheduledExecutorService sampleExecutor =
    59. Executors.newSingleThreadScheduledExecutor(sampleThreadFactory);
    60. this.threadInfoSampleService = new ThreadInfoSampleService(sampleExecutor);
    61. }

    在前半部分进行的一些变量的赋值,在下面初始化了两个心跳管理器,分别为:

    1、TaskExecutor维持和JobMaster的心跳的管理器

    2、TaskExecutor维持和ResourceManager心跳的管理器

    在心跳管理器内部初始化了一个HeartbeatManagerImpl对象,还记得我们在ResourceManager中初始化的心跳管理器为HeartbeatManagerSenderImpl,根据名字能看出这是一个心跳请求发送器,也是在ResourceManager那一章节中我们讲到,在HeartbeatManagerSenderImpl中会有一个定时任务,每10秒钟遍历一次所有的已注册的心跳目标对象,并向每个对象发送心跳请求(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动

    1. public HeartbeatManager createHeartbeatManager(
    2. ResourceID resourceId,
    3. HeartbeatListener heartbeatListener,
    4. ScheduledExecutor mainThreadExecutor,
    5. Logger log) {
    6. /*
    7. TODO
    8. 主节点中的心跳管理器为HeartbeatManagerSenderImpl 心跳请求发送器 client
    9. 在HeartbeatManagerSenderImpl内部构建了一个定时服务
    10. 每10秒 向所有的心跳目标对象,发送心跳请求
    11. 从节点(当前)为HeartbeatManagerImpl 心跳请求处理器 Server
    12. */
    13. return new HeartbeatManagerImpl<>(
    14. heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
    15. }

    到此为止,我们的TaskExecutor的正式初始化完成。

    总结:

    我们在这里总结一下TaskExecutor的初始化流程:

    1、首先构建了一个TaskManagerRunner,用于完成TaskManager启动的准备工作,再完成准备工作后,通过调用TaskManagerRunner的start方法来启动。

    2、在TaskManagerRunner内部初始化了一个TaskManagerService对象,用来初始化TaskExecutor所需要的基础服务。

    3、在TaskManagerService内部,首先会初始化一些基础服务,如TaskEvent Dispatcher、IO管理器、shuffleEnvironment、state管理器、TaskSlotTable等等。

    4、在完成基础服务的初始化之后,开始初始化TaskExecutor,首先初始化了两个心跳管理期,分别来维护和JobMaster、ResourceManager的心跳。因为TaskExecutor继承了RpcEndpoint,所以具有生命周期方法onStart。

    5、TaskExecutor初始化完成。

    在下一章里我们来看已经初始化完成的TaskExecutor的启动流程。

    下一章: Flink 1.13 源码解析——TaskManager启动流程概览

  • 相关阅读:
    【微信小程序】使用npm包
    SpringBoot——日志及原理
    Kafka系列之二Docker集群安装运行
    如何使用Idea打开、导入、运行maven项目
    02 | 如何进行code diff
    【uniapp】 video视频层级、遮挡其他弹窗或顶部导航 使用nvue覆盖
    不花钱就能完美解决大型离散非标设备生产行业企业信息化
    学习残差神经网络(ResNet)
    Wireshark 4.2.5:发现 QUIC 和 VXLAN 协议的新功能
    双非本科是如何逆袭的?这位同学有点东西
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126569713