• 06 flink 的各个角色的交互


    前言

    这里主要是 涉及到 flink 中各个角色的交互 

    TaskManager 和 ResourceManager 的交互

    JobMaster 和 ResourceManager 的交互

    等等流程 

     

     

    TaskManager 和 ResourceManager 的交互

    主要是 包含了几个部分, 如下, 几个菜单 

    TaskManager向 ResourceManager 注册 

    ResourceManager 向 TaskManager 心跳的发送 

    ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理

     

    TaskManager向 ResourceManager 注册 

    TaskManager 中 TaskExecutor 启动之后, 会向 ResourceManager 注册 

    6bf5331485b54b8785ac8dbd9757a5cf.png

     

    注册如下, 向 ResourceManager 这边发送请求, 携带上基本信息, resourceId 是 TaskManagerRunner 中 ResourceID.generate() 随机生成的一个字符串 

    ResourceManager 的地址是根据 JobManager 的信息拼接上固定的 “/user/resourcemanager” 得到的 

    ebd8fd18d2934814acf831fca098e806.png

     

    JobManager 这边的 ResourceManager 注册该 TaskManager 的相关信息, 并相应 ResourceManager 这边创建的 WorkerRegistration 信息返回 

    然后这里注册了一个发送到 TaskManager 的定时心跳, 注册到了 ResourceManager.taskManagerHeartbeatManager 中 

    9bfd33fe609e471fa33f37ba15ee367a.png

     

     

    ResourceManager 向 TaskManager 心跳的发送 

    这里 ResourceManager 向 TaskManager 这边心跳的发送是这里 HeartbeatManagerSenderImpl.run 中处理的, 定时的效果是 延时+递归 来实现的 

    8da832892a1b49afbe605060dbb73fa5.png

     

    然后接着 TaskManager 这边会响应 TaskManager 的各个 slot 的相关信息给 ResourceManager

    4f9792bb93f74db897787cb80bbdd188.png 

     

    ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理

    然后接着就是 ResourceManager 这边的处理, 更新目标 taskManager 的 slot 的相关信息 

    261fe60ad42b4ec28d3d09c07fd7918a.png

     

    然后 web 页面上, 这里 TaskManager 的相关信息 就是来自于 ResourceManager

    ac7bcc81af7c45308ace5c7ea8227265.png 

     

    JobMaster 和 ResourceManager 的交互

    主要是 包含了几个部分, 如下, 几个菜单 

    JobMaster 向 ResourceManager 注册 

     

    JobMaster 向 ResourceManager 注册 

    同样是 JobMaster 启动的饿时候, 会自动向 ResourceManager 注册 

    b9238beb76d54329b98e918a32297392.png

     

    注册的信息如下, jobId, jobResourceId, 以及 jobManager 的交互信息 

    然后这里的 jobResourceId 同样是 JobMaster 初始化的时候 ResourceID.generate() 生成的一个随机字符串

    62dad2f5b2cb4bfcb6d8b0d9b7e2c5e5.png

     

    ResourceManager 向 JobMaster 这边心跳的发送

    ResourceManager 收到 JobMaster 的注册请求之后, 会向 jobManagerHeartbeatManager 注册向 JobMaster 的心跳任务 

    abbc109307d9453d9f4e3d57eae63d15.png 

    然后就是 JobManager 这边收到心跳之后, 向 ResourceManager 发送了一个心跳信息, 未携带 任何数据

    c637a55c68e344f1b447f1244af784eb.png

     

    ResourceManager 这边收到 TaskManager 的 null 之后的处理

    无任何处理, 也不用任何处理 

    7eecef1860874dec9f77e3f71efeb320.png

     

     

    JobMaster 这边资源请求的流程

    JobMaster 启动之后, 自动连接 ResourceManager

    连接上 ResourceManager 之后, 会向 ResourceManager 发送执行资源的请求 

    60424c9481504bf8b0fbeba4a762a218.png

     

    然后是 ResourceManager 这边找到合适的 TaskManagerSlot, 然后 allocateSlot, 向 TaskManager 指定具体的 job 

    25373c9edfc14bc989433bdb5c3ef93c.png

     

    ResourceManager 向 TaskManager 发送请求, 指派其需要执行 目标 job

    cf07415c888e4376bbf1b82290782c2b.png

     

    然后是 TaskExecutor 注册 job 信息, 以及对方 JobMaster 的交互信息 

    2edb9e8e079343d5bc3e2a734f8dde05.png

     

    然后是 TaskExecutor 这边主动和目标 JobMaster 获取联系, 表示为 JobMaster 提供一个 TaskManagerSlot 用于执行目标任务 

    8c65c650f49f44858a5c77412bbce1fa.png

     

    然后是 JobMaster 这边拿到了 TaskManagerSlot 之后执行任务 

    585f2be7271d4a968880a1266d76669d.png 

    接着是更新 Execution 的 slot 的信息, 然后这个是外层 CompletableFuture 是 Execution.scheduleForExecution 中的 allocationFuture

    27280e039fb346b1ac7d58465b827f83.png

     

    然后就是 JobMaster 这边的 deploy, 这里会向具体的 TaskExecutor 发送任务 

    177c71f8c4604d3f87c83fd4c5af8881.png 

    然后 deploy 里面就是根据 ExecutionVertex 封装 TaskDeploymentDescriptor, 然后将相关信息发送到 TaskExecutor 去执行 

    6cc8427929a84df698956e3a5799ad73.png 

     

    处理函数的传递流程 

    这一系列流程如下

    1. driver 这边采集各个函数对象, 封装 UserCodeObjectWrapper, 然后序列化 封装到 TaskConfig, 以 udf 结尾的 配置信息作为 key, 这部分 TaskConfig 是包含在 JobGraph 中的每一个 JobVertex 中的, 然后伴随着 JobGraph 的序列化传递到 JobManager 这边进行处理
    2. JobManager 这边反序列化 JobGraph, 然后创建 JobMaster, 该 JobVertex 经过 ExecutionVertex, TaskDeploymentDescriptor 然后传递到 TaskExecutor
    3. TaskExecutor 这边反序列化 DataSourceTask, ChainedDriver, DataSinkTask 等等, 然后 执行任务

    所以这个流程中 JobManager 这边是仅仅是获取, 持有, 传递 udf 部分, 不涉及 反序列化

     

     

    driver 这边

    从上下文获取 function 对象, 也就是我们驱动代码里面 “new Test01WordCount$MyFlatMapMapper()”, 然后封装了一个 UserCodeObjectWrapper 被 FlatMapOperatorBase 持有 

    然后会经历 Plan, OptimizedPlan 然后到 JobVertex 阶段 

    ecf48fad7bfd40ce9a620130da7423ca.png

     

    然后是创建 JobGraph, 创建每一个 JobVertex 的时候, 序列化该 JobVertex 的 处理函数 

    ed0277d759a1493798c9e2274c7b9cf9.png 

    然后是将 chainedTask, 的相关配置信息放在 主JobVertex

    然后隔离是通过 ”chaining.taskconfig.” + $idx 来进行隔离的, 相当于是增加了一系列的名称空间 

    5e36bc327359446e9ff491c27d2f964d.png

     

    然后就是 JobGraph 的序列化, 准备发送 http 请求 传输 Job 到 JobManager

    e4afe97b2f0f4f579f88a3d772fd5235.png 

     

    JobManager 这边 

    JobManager 这边反序列化 JobGraph 如下, 这里面和客户端那边一样 

    然后 这边的 JobGraph 和 客户端那边的一致, 包含了 JobVertex 中包含了 TaskConfiguration 相关信息 

    eb24d5a273fd495fad368ed910e660e8.png

     

    然后是到后面封装 TaskDeploymentDescriptor 这里可以看到, 也是间接的从 JobVertex 中获取的 TaskConfiguration

    然后 最终的传递是通过 TaskInformation 从 JobMaster 这边传递到 TaskExecutor

    eb1086aaef2e4a5aaa12886b02b7b9e6.png

     

    TaskExecutor 这边 

    反序列化各个 DataSourceTask, ChainedDriver, DataSinkTask 等等的时候

    根据索引, 添加前缀, 来获取给定的 ChainedDriver, 然后添加到 chinedTaskTarget 中, 基于 previous 形成了一个单项任务执行的链表, 用于后面的执行 

    这里各个任务的前缀为 “chaining.taskconfig” + $idx 和前面放入的时候, 是对称的 

    4a7610710a3c44b6857dcce282315ea6.png

     

    这里是具体的获取配置的地方, 前缀 + “udf”, 然后从 配置信息中获取配置

    2a2af1b797c04762bb07974199c5b7f9.png 

    然后是 反序列化 UserCodeObjectWrapper, 里面封装了目标函数, Test01WordCount$MyFlatMapMapper

    b8eee5151bcb4b5d8a931ab44f9e7a45.png 

     

    完 

     

     

     

  • 相关阅读:
    【C语言函数调用详解】——传值调用&传址调用
    【力扣】787. K 站中转内最便宜的航班加权——有向图最短路径
    超详细!手把手带你实现一个完整的Promise
    马斯克440亿美元收购Twitter一年后:全力“下云”,成本速降60%,功能代码从70万行减少至7万!...
    16.面试重点Cookie&Session
    快速使用UE4制作”大场景游戏“
    git常用命令
    【计算机网络】应用层协议原理
    python通过生成器实现协程-生产消费者模型
    流程引擎的架构设计
  • 原文地址:https://blog.csdn.net/u011039332/article/details/132528111