• Flink的ResourceManager详解(一)


    ResourceManager 总结
    一、概述
    1、ResourceManager 管理 Flink 集群中的计算资源,计算资源主要来自 TaskManager 组件。

    2、如果集群采用 Native【本地模式】部署,则 ResourceManager 会动态地向集群资源管理器申请 Container 并启动TaskManager,例如Hadoop Yarn、Kubernetes等。

    3、ResourceManager主要接收来自 JobManager 的 SlotRequest 和 TaskManager 的 SlotReport。
    二、分类
    1、动态资源管理 和 不支持动态资源管理
    1)一类支持动态资源管理,例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager

    支持动态资源管理的集群类型,可以按需启动TaskManager资源,根据Job所需的资源请求,动态启动TaskManager节点,这种资源管理方式不用担心资源浪费和资源动态伸缩的问题。

    实现动态资源管理的ResourceManager需要继承ActiveResourceManager基本实现类。

    2)另一类不支持动态资源管理,例如StandaloneResourceManager
    2、分类图

    在这里插入图片描述

    三、核心服务

    ResourceManagerRuntimeServices 中包含 SlotManager 和 JobLeaderldService 两个主要服务和 HeartbeatService 心跳服务。

    1、SlotManager 管理整个集群的 Slot 计算资源,并对 Slot 计算资源进行统一的分配和管理,同时实现了对 TaskManager 信息的注册和管理。
    2、JobLeaderldService 通过实现 jobLeaderldListeners 实时监听 JobManager 的运行状态,以获取集群启动的作业对应的 JobLeaderld 信息,防止出现 JobManager 无法连接的情况,用于管理注册的 JobManager 节点,包括对 JobManager 的注册和注销等操作。
    3、HeartbeatService 主要通过 TaskManagerHeartbeatListener 和 JobManagerHeartbeatListener 两个监听器收集来自 TaskManager和 JobManager 的心跳信息,以保证整个运行时中各个组件之间能够正常通信。
    四、ResourceManager 的初始化和启动
    DefaultDispatcherResourceManagerComponentFactory#create 方法
    1、初始化 ResourceManager
     resourceManager =
                        resourceManagerFactory.createResourceManager(
                                configuration,
                                ResourceID.generate(),
                                rpcService,
                                highAvailabilityServices,
                                heartbeatServices,
                                fatalErrorHandler,
                                new ClusterInformation(hostname, blobServer.getPort()),
                                webMonitorEndpoint.getRestBaseUrl(),
                                metricRegistry,
                                hostname,
                                ioExecutor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    1)创建 ResourceManagerRuntimeServices
    1.创建 SlotManager

    SlotMatchingStrategy 根据作业中给定的 ResourceProfile 匹配 Slot 计算资源。SlotMatchingStrategy主要分为两种类型:

    一种是LeastUtilizationSlotMatchingStrategy,即按照利用率最低原则匹配Slot资源,尽可能保证TaskExecutor上资源的使用率处于比较低的水平,这种策略能够有效降低机器的负载。

    另一种是AnyMatchingSlotMatchingStrategy,即直接返回第一个匹配的Slot资源策略。

    private static SlotManager createSlotManager(
                ResourceManagerRuntimeServicesConfiguration configuration,
                ScheduledExecutor scheduledExecutor,
                SlotManagerMetricGroup slotManagerMetricGroup) {
            final SlotManagerConfiguration slotManagerConfiguration =
                    configuration.getSlotManagerConfiguration();
            if (configuration.isEnableFineGrainedResourceManagement()) {
                return new FineGrainedSlotManager(
                        scheduledExecutor,
                        slotManagerConfiguration,
                        slotManagerMetricGroup,
                        new DefaultResourceTracker(),
                        new FineGrainedTaskManagerTracker(),
                        new DefaultSlotStatusSyncer(
                                slotManagerConfiguration.getTaskManagerRequestTimeout()),
                        new DefaultResourceAllocationStrategy(
                                SlotManagerUtils.generateTaskManagerTotalResourceProfile(
                                        slotManagerConfiguration.getDefaultWorkerResourceSpec()),
                                slotManagerConfiguration.getNumSlotsPerWorker()),
                        Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));
            } else if (configuration.isDeclarativeResourceManagementEnabled()) {
                return new DeclarativeSlotManager(
                        scheduledExecutor,
                        slotManagerConfiguration,
                        slotManagerMetricGroup,
                        new DefaultResourceTracker(),
                        new DefaultSlotTracker());
            } else {
                return new SlotManagerImpl(
                        scheduledExecutor, slotManagerConfiguration, slotManagerMetricGroup);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    2.创建 JobLeaderIdService
    final JobLeaderIdService jobLeaderIdService =
                    new DefaultJobLeaderIdService(
                            highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
    
    • 1
    • 2
    • 3
    2)返回创建的 StandaloneResourceManager
    return new StandaloneResourceManager(
                    rpcService,
                    resourceId,
                    highAvailabilityServices,
                    heartbeatServices,
                    resourceManagerRuntimeServices.getSlotManager(),
                    ResourceManagerPartitionTrackerImpl::new,
                    resourceManagerRuntimeServices.getJobLeaderIdService(),
                    clusterInformation,
                    fatalErrorHandler,
                    resourceManagerMetricGroup,
                    standaloneClusterStartupPeriodTime,
                    AkkaUtils.getTimeoutAsTime(configuration),
                    ioExecutor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在 StandaloneResourceManager 构造方法中启动 RpcServer

    this.rpcServer = rpcService.startServer(this);
    
    • 1
    2、启动 ResourceManager
    resourceManager.start()->ResourceManager#onStart
    
    • 1

    ResourceManager#startResourceManagerServices

    1)获取 leaderElectionService
    leaderElectionService =
                        highAvailabilityServices.getResourceManagerLeaderElectionService();
    
    • 1
    • 2
    2)初始化 resourceManagerDriver【ActiveResourceManager需要】
    resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), ioExecutor);
    
    • 1
    3)启动 leader 竞选,在 leader 节点启动服务
    1.启动心跳服务

    在ResourceManager中HeartbeatService的启动方法中,包括了对taskManagerHeartbeatManager和jobManagerHeartbeatManager两个心跳管理服务的启动操作。

    而心跳管理服务主要通过TaskManagerHeartbeatListener和JobManagerHeartbeatListener两个监听器收集来自TaskManager和JobManager的心跳信息,以保证整个运行时中各个组件之间能够正常通信。

    startHeartbeatServices();
    
    • 1
    2.启动 slotManager 服务

    通过scheduledExecutor线程池启动TaskManager周期性超时检查服务,通过checkTaskManagerTimeouts()方法实现该检查,防止TaskManager长时间掉线等问题。

    启动单独的线程对提交的SlotRequest进行周期性超时检查,防止Slot请求超时。

    slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
    
    • 1
    4)启动 jobLeaderIdService
    jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    
    • 1
    五、总结
    1、ResourceManager 通过 SlotManager 管理集群中的计算资源(TaskManager 的 SlotReport)响应 JobManager 的 SlotRequest;
    2、ResourceManager 通过 HeartBeatService 监听 JobManager 和 TaskManager 的心跳,保证运行时各个组件间能够正常通信;
    3、ResourceManager 通过 JobLeaderldService 管理注册的 JobManager 节点,包括对 JobManager 的注册和注销等操作;
  • 相关阅读:
    【Java-webflux】Spring5新特性之webflux反应式编程-Project Reactor
    阿里云oss上传视频测试,出现了413错误
    Redis-基本介绍/linux下环境配置/配置文件
    插件创建Maven工程
    html5 列表/表格标签
    Linux内核分析(一)--内核架构和子系统
    JavaScript中的一等公民: 函数(Function)
    springboot自定义Json序列化返回,实现自动转换字典值
    接口自动化测试实践指导(下):接口自动化测试断言设置思路
    BUUCTF 九连环 1
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/133815656