• Flink 1.13 源码解析——TaskManager启动流程概览


    ​​​​​​点击这里查看 Flink 1.13 源码解析 目录汇总

    上一章:Flink 1.13 源码解析——TaskManager启动流程 之 初始化TaskExecutor

    下一章:Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互

    目录

    前言

    TaskExecutor启动流程:


    前言

    在上一章中我们介绍了TaskManager在初始化阶段所做的工作,主要有一下内容:

    1、首先构建了一个TaskManagerRunner,用于完成TaskManager启动的准备工作,再完成准备工作后,通过调用TaskManagerRunner的start方法来启动。

    2、在TaskManagerRunner内部初始化了一个TaskManagerService对象,用来初始化TaskExecutor所需要的基础服务。

    3、在TaskManagerService内部,首先会初始化一些基础服务,如TaskEvent Dispatcher、IO管理器、shuffleEnvironment、state管理器、TaskSlotTable等等。

    4、在完成基础服务的初始化之后,开始初始化TaskExecutor,首先初始化了两个心跳管理期,分别来维护和JobMaster、ResourceManager的心跳。因为TaskExecutor继承了RpcEndpoint,所以具有生命周期方法onStart。

    5、TaskExecutor初始化完成。
    ————————————————
    版权声明:本文为CSDN博主「EdwardsWang丶」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/EdwardWong_/article/details/126569713

    由于TaskExecutor的启动流程较为复杂,所做的工作比较多,我们分几章来讲述,本章首先来概述一下TaskExecutor的启动流程,启动中的一些工作的完成流程我们将在后续几章展开讲述。

    TaskExecutor启动流程:

            在上一章中我们分析了TaskExecutor的初始化过程,在最后我们分析了TaskExecutor继承自RpcEndpoint,所有在初始化完成后会调用自身的生命周期方法onStart,我们首先来看这个TaskExecutor.onStart()方法:

    1. // ------------------------------------------------------------------------
    2. // Life cycle
    3. // ------------------------------------------------------------------------
    4. @Override
    5. public void onStart() throws Exception {
    6. try {
    7. // TODO 启动从节点相关服务,会进行相关服务的注册
    8. startTaskExecutorServices();
    9. } catch (Throwable t) {
    10. final TaskManagerException exception =
    11. new TaskManagerException(
    12. String.format("Could not start the TaskExecutor %s", getAddress()), t);
    13. onFatalError(exception);
    14. throw exception;
    15. }
    16. // TODO 开启了一个注册超时服务,如果上面的服务注册成功,则会回调stopRegistrationTimeout
    17. startRegistrationTimeout();
    18. }

    在这个方法里做了两件事:

    1、启动从节点的相关服务,并进行注册。

    2、开启注册超时服务。

    我们首先来看这个注册超时服务:

    1. private void startRegistrationTimeout() {
    2. final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
    3. if (maxRegistrationDuration != null) {
    4. final UUID newRegistrationTimeoutId = UUID.randomUUID();
    5. currentRegistrationTimeoutId = newRegistrationTimeoutId;
    6. // TODO 提交了一个异步定时任务,如果在时间到达时没有取消,则会执行该任务
    7. scheduleRunAsync(
    8. () -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
    9. }
    10. }

            在这个注册超时服务里面,提交了一个异步的定时任务,或者可以理解为一个倒计时任务,如果注册成功,则会执行一个名为stopRegistrationTimeout的方法来取消这个定时任务,如果在规定时间内还没有注册成功,则会这行这个超时方法,方法向外抛出Fatal级别的异常。:

    1. private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
    2. if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
    3. final Time maxRegistrationDuration =
    4. taskManagerConfiguration.getMaxRegistrationDuration();
    5. // TODO 注册超时则 报错:致命错误
    6. onFatalError(
    7. new RegistrationTimeoutException(
    8. String.format(
    9. "Could not register at the ResourceManager within the specified maximum "
    10. + "registration duration %s. This indicates a problem with this instance. Terminating now.",
    11. maxRegistrationDuration)));
    12. }
    13. }

    看完这个延时注册超时方法,我们继续来看onStart中的startTaskExecutorServices()方法:

    1. private void startTaskExecutorServices() throws Exception {
    2. try {
    3. // start by connecting to the ResourceManager
    4. // TODO 监控ResourceManager
    5. /*
    6. TODO
    7. 1.从代码的字面意思能够得知,这就是为了获取ResourceManager的地址,同时添加监听
    8. 2.获取到ResourceManager的地址之后,当前启动的TaskExecutor就可以注册了
    9. 3. 注册之后会收到注册相应(成功: ,失败则直接关闭JVM)
    10. 4. 如果注册成功则:
    11. 维持和ResourceManager之间的心跳
    12. 做slot资源汇报
    13. */
    14. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    15. // tell the task slot table who's responsible for the task slot actions
    16. // TODO 启动taskSlotTable
    17. taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
    18. // start the job leader service
    19. // TODO 监控JobMaster
    20. jobLeaderService.start(
    21. getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    22. // TODO 文件缓存服务
    23. fileCache =
    24. new FileCache(
    25. taskManagerConfiguration.getTmpDirectories(),
    26. blobCacheService.getPermanentBlobService());
    27. } catch (Exception e) {
    28. handleStartTaskExecutorServicesException(e);
    29. }
    30. }

    在这个方法里,启动了一些TaskExecutor的核心组件:

    1、启动对ResourceManager的监控服务

    2、启动taskSlotTable服务

    3、启动对JobMaster的监控服务

    4、启动文件缓存服务

    在完成以上四个工作以后,taskManager就启动完成了,在启动过程中涉及到了向ResourceManager的注册、资源的汇报、向JobMaster的注册、ResourceManager和JobMaster和TaskExecutor之间的心跳交互等等工作,我们在后续章节里来给大家分析,在下一章,我们首先来分析ResourceManagerManager的监控服务。

    以下是针对于上面四个工作的详细分析:

    下一章:Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互

  • 相关阅读:
    java-net-php-python-jsp微山湖特色产品网络商城计算机毕业设计程序
    Python爬虫技术与反爬虫策略
    DSPE-PEG-NYZL1,NYZL1-PEG-DSPE,磷脂-聚乙二醇-靶向肽NYZL1,小分子靶向肽
    Node.js 入门教程 18 package.json 指南
    定时器的类型
    【ARMv9 DSU-120 系列 -- Mapping for address target groups to CHI bus master ports】
    传统语音增强——基本谱减法
    结构思考力~结构化接收信息的三个步骤
    神经网络算法简单例子,图神经网络推荐算法
    EAV模型(实体-属性-值)的设计和低代码的处理方案(1)
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126592068