• 【Flink集群RPC通讯机制(三)】AkkaRpcActor设计与实现:接收RPC消息以及处理逻辑


    RPC请求发送后接收方的处理逻辑

    在RpcEndpoint中创建的RemoteRpcInvocation消息,最终会通过Akka系统传递到被调用方。例如TaskExecutor向ResourceManager发送SlotReport请求的时候,会在TaskExecutor中将ResourceManagerGateway的方法名称和参数打包成RemoteRpcInvocation对象。然后经过网络发送到ResourceManager中的AkkaRpcActor,处理请求。

    接下来深入了解AkkaRpcActor的设计与实现,了解在AkkaRpcActor中如何接收RemoteRpcInvocation消息并执行后续的操作。

     

    1. 创建Receiver

    如代码所示,首先在AkkaRpcActor中创建Receive对象,用于处理Akka系统接收的其他Actor发送过来的消息。

    Receiver相关能力

    在AkkaRpcActor中主要创建了RemoteHandshakeMessage、ControlMessages等消息对应的处理器,

    • 其中RemoteHandshakeMessage主要用于进行正式RPC通信之前的网络连接检测,保障RPC通信正常。
    • ControlMessages用于控制Akka系统,例如启动和停止Akka Actor等控制消息。这里我们重点关注第三种类型的消息,即在集群运行时中RPC组件通信使用的Message类型,此时会调用handleMessage()方法对这类消息进行处理。
    public Receive createReceive() {
        return ReceiveBuilder.create()
            .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
            .match(ControlMessages.class, this::handleControlMessage)
            .matchAny(this::handleMessage)
            .build();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

     

    2. 进行消息处理

    在AkkaRpcActor.handleMessage()方法中,最终会调用handleRpcMessage()方法继续对RPC消息进行处理。

    如下代码:

    
    //根据RPC消息类型,进行不同方式处理
    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
        //将代码块提交到本地线程池中执行
            handleRunAsync((RunAsync) message);
        } else if (message instanceof CallAsync) {
            handleCallAsync((CallAsync) message);
        } else if (message instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) message);
        } else {
            // 省略部分代码
            sendErrorIfSender(
                new AkkaUnknownMessageException("Received unknown message " + message +  
                   " of type " +  message.getClass().getSimpleName() + '.'));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    接着看AkkaRpcActor.handleRpcInvocation()方法逻辑:

    1. 判断当前RpcEndpoint是否实现了指定rpcMethod。

    例如JobMaster调用ResourceManagerGateway.requestSlot()方法,会在lookupRpcMethod()方法中判断当前ResourceManager实现的Endpoint是否提供了该方法的实现。

    1. 当rpcMethod不为空时,rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs())
    2. 调用sendAsyncResponse()、sendSyncResponse()方法通过Akka系统将RpcMethod返回值返回给调用方。
    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method rpcMethod = null;
        try {
            String methodName = rpcInvocation.getMethodName();
            Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
            rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        } catch (ClassNotFoundException e) {
            // 省略部分代码
        }
        if (rpcMethod != null) {
            try {
                rpcMethod.setAccessible(true);
                if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                    // 没有返回值的情况
                    rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
                }
                else {
                      // 有返回值的情况
                    final Object result;
                    try {
                        result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
                    }
                    catch (InvocationTargetException e) {
                        getSender()
                            .tell(new Status.Failure(e.getTargetException()), getSelf());
                        return;
                    }
                    final String methodName = rpcMethod.getName();
                    if (result instanceof CompletableFuture) {
                        final CompletableFuture<?> responseFuture = 
                            (CompletableFuture<?>) result;
                        sendAsyncResponse(responseFuture, methodName);
                    } else {
                        sendSyncResponse(result, methodName);
                    }
                }
            } catch (Throwable e) {
                log.error("Error while executing remote procedure call {}.", 
                          rpcMethod, e);
                // 通知错误信息
                getSender().tell(new Status.Failure(e), getSelf());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    接下来从更加宏观的角度了解各组件之间如何基于已经实现的RPC框架进行通信,进一步加深对Flink中RPC框架的了解。

     
     
    参考:《Flink设计与实现:核心原理与源码解析》–张利兵

  • 相关阅读:
    个人云服务的搭建(折腾)之旅
    图像库 PIL(一)
    FuseDream论文阅读笔记 文本生成图像 text2image
    PostgreSQL的学习心得和知识总结(九十二)|语法级自上而下完美实现MySQL数据库的 枚举类型创建表及插入数据等操作 的实现方案
    python web开发过程
    IPv6与VoIP——ipv6接口标识与VoIP概述
    电脑如何在网页上下载视频 浏览器如何下载网页视频
    js常用方法
    oracle sql monitor简单使用说明
    河北吉力宝打造步力宝智能康养鞋,助力健康中国行
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/136228982