• 【Flink源码】Flink心跳机制


    【Flink源码】再谈 Flink 程序提交流程(下) 一文中,我们在讲述 TaskExecutor 启动流程时,在 TaskExecutor 构造方法中讲到 TaskExecutor 维护了与 ResourceManager 和 JobMaster 的心跳服务
    同样在 ResourceManager 启动时也维护了与 JobMaster 和 TaskManager 的心跳
    接下来我们针对 ResourceManager 与 TaskManager 的心跳交互流程来深入探究 flink 的心跳机制


    ResourceManager 与 TaskManager 的心跳交互

    ResourceManager 启动心跳服务

    在 ResourceManager 启动时,会启动两个心跳服务
    让我们进入 ResourceManager 类一探究竟

    ResourceManager.java

    private void startHeartbeatServices() {
        // ResourceManager(主节点)维持和从节点的心跳
        // ResoueceManager(逻辑 JobManager)维持和 TaskExecutor(TaskManager)的心跳
        taskManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new TaskManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);
        // ResourceManager 维持和 JobMaster 的心跳
        jobManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new JobManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里维持了两个心跳服务:

    1. 维持 ResourceManager 和 TaskManager 的心跳服务
    2. 维持 ResourceManager 和 JobMaster 的心跳服务

    以 ResourceManager 与 TaskManager 的心跳服务为例,进去继续探究

    HeartbeatServices.java

    public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
            ResourceID resourceId,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {
    
        return new HeartbeatManagerSenderImpl<>(
                heartbeatInterval,
                heartbeatTimeout,
                failedRpcRequestsUntilUnreachable,
                resourceId,
                heartbeatListener,
                mainThreadExecutor,
                log);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    找到 HeartbeatManagerSenderImpl 的构造方法

    HeartbeatManagerSenderImpl.java

    HeartbeatManagerSenderImpl(
            long heartbeatPeriod,
            long heartbeatTimeout,
            int failedRpcRequestsUntilUnreachable,
            ResourceID ownResourceID,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log,
            HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
        super(
                heartbeatTimeout,
                failedRpcRequestsUntilUnreachable,
                ownResourceID,
                heartbeatListener,
                mainThreadExecutor,
                log,
                heartbeatMonitorFactory);
    
        this.heartbeatPeriod = heartbeatPeriod;
        // 线程池定时调用 this 的 run 方法,由于 delay 为 0L,立即执行
        mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    当 ResourceManager 启动的时候,会第一次执行到这段代码,在这里启动了一个延时调度的线程池,不过这里的延时参数为 0,所以会立即执行 run 方法
    我们继续看 run 方法

    public void run() {
        if (!stopped) {
            log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
                // 向所有已注册的从节点封装后的 heartbeatMonitor 对象发送心跳 RPC 请求
                requestHeartbeat(heartbeatMonitor);
            }
            // 等 heartbearPeriod=10s之后,再次执行 this 的 run 方法,来控制上面的 for 循环每隔 10s 执行一次,实现心跳的无限循环
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里会遍历所有已注册到 ResourceManager 上的节点,这些节点被封装成一个个的心跳对象 HeartbeatMonitor,并放在集合中,当代码第一次执行到这里时,会遍历所有的心跳对象,对每个已注册的节点触发一次心跳,在对所有的节点发送完心跳后,又启动了一个延时任务,每隔 10s 触发一次当前类的 run 方法,即该方法本身。也就是说每隔 10s 对所有已注册的节点发送一次心跳,通过此机制来完成无限心跳。
    不过目前还没有 TaskManager 节点注册进来,所以此时的心跳对象集合中是没有 TaskExecutor 的封装对象的
    现在我们回头去看 TaskExecutor 在注册环节的一些步骤

    TaskManager 向 ResourceManager 注册心跳服务

    当 TaskExecutor 向 ResourceManager 注册时,会去获取 ResourceManager 的代理对象,并通过调用代理对象的 registerTaskExecutor 方法,触发 ResourceManager 的 registerTaskExecutor 方法
    我们来看 ResourceManager 的 registerTaskExecutor 方法

    ResourceManager.java

    public CompletableFuture<RegistrationResponse> registerTaskExecutor(
            final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {
        
        // 获取 TaskExecutor 的代理,准备回复注册响应
        CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture =
                getRpcService()
                        .connect(
                                taskExecutorRegistration.getTaskExecutorAddress(),
                                TaskExecutorGateway.class);
        taskExecutorGatewayFutures.put(
                taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);
    
        return taskExecutorGatewayFuture.handleAsync(
                (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
                    final ResourceID resourceId = taskExecutorRegistration.getResourceId();
                    if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {
                        taskExecutorGatewayFutures.remove(resourceId);
                        if (throwable != null) {
                            return new RegistrationResponse.Failure(throwable);
                        } else {
                            // 内部注册具体实现
                            return registerTaskExecutorInternal(
                                    taskExecutorGateway, taskExecutorRegistration);
                        }
                    } else {
                        log.debug(
                                "Ignoring outdated TaskExecutorGateway connection for {}.",
                                resourceId.getStringWithMetadata());
                        return new RegistrationResponse.Failure(
                                new FlinkException("Decline outdated task executor registration."));
                    }
                },
                getMainThreadExecutor());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在方法里 ResourceManager 首先去获取 TaskExecutor 的代理对象,准备回复注册响应

    private RegistrationResponse registerTaskExecutorInternal(
            TaskExecutorGateway taskExecutorGateway,
            TaskExecutorRegistration taskExecutorRegistration) {
        // TaskExecutor 的 ResourceId
        ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();
        // 获取 TaskExecutor 的注册对象,如果存在,则证明注册过,需要更新
        WorkerRegistration<WorkerType> oldRegistration =
                taskExecutors.remove(taskExecutorResourceId);
        // 如果有旧注册信息
        if (oldRegistration != null) {
            // TODO :: suggest old taskExecutor to stop itself
            log.debug(
                    "Replacing old registration of TaskExecutor {}.",
                    taskExecutorResourceId.getStringWithMetadata());
    
            // remove old task manager registration from slot manager
            // 取消旧的 TaskManager 的注册,再进行新的 TaskManager 的注册
            slotManager.unregisterTaskManager(
                    oldRegistration.getInstanceID(),
                    new ResourceManagerException(
                            String.format(
                                    "TaskExecutor %s re-connected to the ResourceManager.",
                                    taskExecutorResourceId.getStringWithMetadata())));
        }
    
        final WorkerType newWorker = workerStarted(taskExecutorResourceId);
    
        String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();
        if (newWorker == null) {
            log.warn(
                    "Discard registration from TaskExecutor {} at ({}) because the framework did "
                            + "not recognize it",
                    taskExecutorResourceId.getStringWithMetadata(),
                    taskExecutorAddress);
            return new TaskExecutorRegistrationRejection(
                    "The ResourceManager does not recognize this TaskExecutor.");
        } else {
            // 生成注册对象
            WorkerRegistration<WorkerType> registration =
                    new WorkerRegistration<>(
                            taskExecutorGateway,
                            newWorker,
                            taskExecutorRegistration.getDataPort(),
                            taskExecutorRegistration.getJmxPort(),
                            taskExecutorRegistration.getHardwareDescription(),
                            taskExecutorRegistration.getMemoryConfiguration(),
                            taskExecutorRegistration.getTotalResourceProfile(),
                            taskExecutorRegistration.getDefaultSlotResourceProfile(),
                            taskExecutorRegistration.getNodeId());
    
            log.info(
                    "Registering TaskManager with ResourceID {} ({}) at ResourceManager",
                    taskExecutorResourceId.getStringWithMetadata(),
                    taskExecutorAddress);
            // 完成注册,这个 taskExecutors 是一个 map,维护着 ResourceID 和注册对象的关系
            taskExecutors.put(taskExecutorResourceId, registration);
    
            // 从节点心跳管理器,保存了注册进来的 TaskExecutor 的 ResourceID 和包装的该 TaskExecutor 的心跳对象
            taskManagerHeartbeatManager.monitorTarget(
                    taskExecutorResourceId, new TaskExecutorHeartbeatSender(taskExecutorGateway));
            // 返回注册成功消息给 TaskExecutor 的引用
            return new TaskExecutorRegistrationSuccess(
                    registration.getInstanceID(), resourceId, clusterInformation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    HeartbeatManagerImpl.java

    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (!stopped) {
            if (heartbeatTargets.containsKey(resourceID)) {
                log.debug(
                        "The target with resource ID {} is already been monitored.",
                        resourceID.getStringWithMetadata());
            } else {
                // 根据 HeartbeatTarget 创建 HeartbeatMonitor 并注册到 heartbeatTarget map 中
                HeartbeatMonitor<O> heartbeatMonitor =
                        heartbeatMonitorFactory.createHeartbeatMonitor(
                                resourceID,
                                heartbeatTarget,
                                mainThreadExecutor,
                                heartbeatListener,
                                heartbeatTimeoutIntervalMs,
                                failedRpcRequestsUntilUnreachable);
                // 加入心跳目标对象集合
                heartbeatTargets.put(resourceID, heartbeatMonitor);
    
                // check if we have stopped in the meantime (concurrent stop operation)
                // 如果心跳机制的 HeartbeatManagerImpl 已关闭,则取消心跳超时任务
                if (stopped) {
                    heartbeatMonitor.cancel();
    
                    heartbeatTargets.remove(resourceID);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    可以看到,在这里,ResourceManager 的从节点管理器将刚才注册进来的从节点注册为一个心跳对象,并加入自身的心跳对象集合,在完成心跳对象的构建和添加之后,我们回到上一级方法,将会触发 requestHeartbeat 方法

    @Override
    public void requestHeartbeat(ResourceID resourceID, Void payload) {
        // TODO ResourceManager发送心跳Rpc请求给TaskExecutor
        taskExecutorGateway.heartbeatFromResourceManager(resourceID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里会调用 TaskExecutor 的代理对象的 heartbeatFromResourceManager 方法发送心跳,我们来看这个方法,选择 TaskExecutor 实现

    TaskExecutor.java

    @Override
    public CompletableFuture<Void> heartbeatFromResourceManager(ResourceID resourceID) {
        // TaskExecutor 接收到 ResourceManager 发送过来的心跳请求
        return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里会触发 TaskExecutor 中的主节点心跳管理器的 requestHeartbeat 方法,我们来看这个方法,选择 HeartbeatManagerImpl 实现

    HeartbeatManagerImpl.java

    public CompletableFuture<Void> requestHeartbeat(
            final ResourceID requestOrigin, I heartbeatPayload) {
        if (!stopped) {
            log.debug("Received heartbeat request from {}.", requestOrigin);
    
            // 汇报心跳
            // 当TaskExecutor 调用此方法,其实就是 TaskExecutor 自己记录,最近一次和 ResourceManager 之间的心跳时间
            final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);
    
            if (heartbeatTarget != null) {
                if (heartbeatPayload != null) {
                    heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }
                
                // 给主节点回复心跳,并做负载汇报
                heartbeatTarget
                        .receiveHeartbeat(
                                getOwnResourceID(),
                                heartbeatListener.retrievePayload(requestOrigin))
                        .whenCompleteAsync(handleHeartbeatRpc(requestOrigin), mainThreadExecutor);
            }
        }
    
        return FutureUtils.completedVoidFuture();
    }
    
    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (heartbeatTargets.containsKey(resourceID)) {
            HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID);
            // 记录心跳
            // 当从节点回复主节点心跳时,当前 HeartbeatMonitor 为主节点
            // 当主节点回复从节点心跳时,当前 HeartbeatMonitor 为从节点
            heartbeatMonitor.reportHeartbeat();
    
            return heartbeatMonitor.getHeartbeatTarget();
        } else {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    HeartbeatMonitorImpl.java

    public void reportHeartbeat() {
        // 记录最后一次心跳时间
        lastHeartbeat = System.currentTimeMillis();
        // 重置心跳超时时间
        resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里首先记录了一下当前的时间戳,再将时间传入 resetHeartbeatTimeout 方法,我们进入这个方法

    void resetHeartbeatTimeout(long heartbeatTimeout) {
        // 判断当前 HeartBeatMonitor 的状态是否是 Running
        if (state.get() == State.RUNNING) {
            // 先取消超时服务
            cancelTimeout();
            // 重新进行延时调度
            futureTimeout =
                    scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
    
            // Double check for concurrent accesses (e.g. a firing of the scheduled future)
            if (state.get() != State.RUNNING) {
                cancelTimeout();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里一共做了三件事:

    1. 判断当前心跳对象的运行状态
    2. 取消当前的延时调度任务
    3. 重新启动一个延时调度任务

    我们继续回到 requestHeartbeat 方法里,看 TaskExecutor 向 ResourceManager 的心跳回复

     // TODO 给主节点回复心跳,并做负载汇报
    heartbeatTarget.receiveHeartbeat(
            getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
    
    • 1
    • 2
    • 3

    我们进入 receiveHeartbeat 方法中选择 HeartbeatManagerImpl 实现

    HeartbeatManagerImpl.java

    public CompletableFuture<Void> receiveHeartbeat(
            ResourceID heartbeatOrigin, I heartbeatPayload) {
        if (!stopped) {
            log.debug("Received heartbeat from {}.", heartbeatOrigin);
            // 接收到 TaskExecutor 的心跳汇报
            reportHeartbeat(heartbeatOrigin);
    
            // 如果 TaskExecutor 本次汇报的负载信息为空,则还以上次汇报的负载信息为准
            // 如果不为空则记录
            if (heartbeatPayload != null) {
                heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
            }
        }
    
        return FutureUtils.completedVoidFuture();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在该方法中 ResourceManager 首先通过 resportHeartbeat 方法接收心跳汇报,我们进入这个方法

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (heartbeatTargets.containsKey(resourceID)) {
            HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID);
            // 记录心跳
            // 当从节点回复主节点心跳时,当前 HeartbeatMonitor 为主节点
            // 当主节点回复从节点心跳时,当前 HeartbeatMonitor 为从节点
            heartbeatMonitor.reportHeartbeat();
    
            return heartbeatMonitor.getHeartbeatTarget();
        } else {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    又回到了这个方法,在这里通过 reportHeartBeat 方法更新了一下心跳时间
    回到上一级代码

    // 如果 TaskExecutor 本次汇报的负载信息为空,则还以上次汇报的负载信息为准
    // 如果不为空则记录
    if (heartbeatPayload != null) {
        heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看到这里 TaskExecutor 还对 ResourceManager 进行了一次负载的汇报工作,如果本次负载信息汇报为空,则此节点的负载信息还以上一次的汇报结果为准。找到这个 reportPayload 方法

    ResourceManager.java

    public void reportPayload(
            final ResourceID resourceID, final TaskExecutorHeartbeatPayload payload) {
        validateRunsInMainThread();
        // 获取 TaskExecutor 的注册信息
        final WorkerRegistration<WorkerType> workerRegistration = taskExecutors.get(resourceID);
    
        if (workerRegistration == null) {
            log.debug(
                    "Received slot report from TaskManager {} which is no longer registered.",
                    resourceID.getStringWithMetadata());
        } else {
            InstanceID instanceId = workerRegistration.getInstanceID();
            // 进行 TaskExecutor 的 slot 状态汇报
            slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
            clusterPartitionTracker.processTaskExecutorClusterPartitionReport(
                    resourceID, payload.getClusterPartitionReport());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    此处来到了资源注册环节,首先从节点的注册对象 WorkerRegistration 里获取到相关的 TaskExecutor 的注册信息,再通过 slotManager 进行资源的汇报,找到 reportSlotStatus 方法

    @Override
    public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
        checkInit();
    
        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
    
        if (null != taskManagerRegistration) {
            LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport);
    
            // TODO 进行TaskExecutor的所有Slot的状态汇报
            for (SlotStatus slotStatus : slotReport) {
                // TODO 更新slot状态
                updateSlot(
                        slotStatus.getSlotID(),
                        slotStatus.getAllocationID(),
                        slotStatus.getJobID());
            }
    
            return true;
        } else {
            LOG.debug(
                    "Received slot report for unknown task manager with instance id {}. Ignoring this report.",
                    instanceId);
    
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里遍历了所有 TaskExecutor 注册进来的 Slot,然后通过 updateSlot 方法更新每一个 Slot 的状态信息
    至此,TaskManager 启动之后的第一轮注册心跳已经完成,在完成注册心跳之后,TaskManager 并不会主动向 ResourceManager 发送心跳,而是当 ResourceManager 的心跳发送来后,进行心跳的回复,同时回复自身的负载等信息
    由于在注册心跳的环节中,TaskManager 已经被 ResourceManager 封装为了心跳对象并存放在 ResourceManager 的从节点心跳管理器集合中,ResourceManager 的心跳服务会不停地遍历所有 TaskManager 的心跳对象发送心跳
    回到开头 ResourceManager 注册心跳的 run 方法

    HeartbeatManagerSenderImpl

    public void run() {
        if (!stopped) {
            log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
                // 向所有已注册的从节点封装后的 heartbeatMonitor 对象发送心跳 RPC 请求
                requestHeartbeat(heartbeatMonitor);
            }
            // 等 heartbearPeriod=10s之后,再次执行 this 的 run 方法,来控制上面的 for 循环每隔 10s 执行一次,实现心跳的无限循环
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我们看 requestHeartbeat 方法

    private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
        O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
        final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    
        heartbeatTarget
                .requestHeartbeat(getOwnResourceID(), payload)
                .whenCompleteAsync(
                        handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),
                        getMainThreadExecutor());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里首先获取了 ResourceManager 从节点心跳管理器中的从节点心跳对象,并调用心跳对象的 requestHeartbeat 方法

    @Override
    public void requestHeartbeat(ResourceID resourceID, Void payload) {
        // TODO ResourceManager发送心跳Rpc请求给TaskExecutor
        taskExecutorGateway.heartbeatFromResourceManager(resourceID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再次回到这里
    至此,心跳机制就到底了


    总结
    Flink 的心跳交互机制和 HDFS 不一样,下面叙述其区别:

    1. Flink 心跳:ResourceManager 率先启动,然后启动一个向所有心跳目标对象发送心跳请求的定时任务。当有 TaskExecutor 上线并注册成功,则会生成一个 HeartBeatMonitor 加入到心跳目标对象集合,然后 ResourceManager 开始向所有 TaskExecutor 发送心跳请求。TaskExecutor 接收到心跳请求,则执行最近心跳时间的修改,和心跳超时任务的重置。如果超时了则发起请求,连接新的 ResourceManager。
    2. HDFS 心跳:NameNode 率先启动,然后启动一个超时检查服务,DataNode 其中之后过来注册,当注册成功之后,DataNode 执行定时心跳任务。在 HDFS 中,是从节点 DataNode 主动发送心跳。
  • 相关阅读:
    easyexcel自定义合并策略和自定义消息转换器
    大学生值得珍藏的实用网站推荐
    泡泡玛特加速海外布局,泰国首店开业吸引超千名粉丝排队
    自学JAVA的第三天之第一个Java程序
    【一】Mac 本地部署大模型
    【C语言】扫雷小游戏(保姆教程)
    ArcMap的mxd文件没有数据、显示感叹号怎么办?
    记一次清理挖矿病毒的过程
    如何查询快递单号物流及时发现退回件单号
    JAVA语言特性
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722803