• RpcProvider分发rpc服务:socket连接回调和读写事件回调的实现


    连接回调

    // 新的socket连接回调
    void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr& conn)
    {
        if (!conn->connected())
        {
            // 和rpc client的连接断开了
            conn->shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    读写事件回调的设计

    对于在网络上接收的字符流,在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型,定义proto的message类型,进行数据头的序列化和反序列化。
    比如需要得知:service_name, method_name, args。不定义类型,传过来的字符流,是没办法识别的。

    另外,为了防止粘包,可以在头部的message类型里添加 args_size,即参数的长度。

    所以需要设计的部分有:
    header_size(4个字节)(服务名字、方法名字) + header_str + args_str

    最后设计出该类型
    rpcHeader.proto

    syntax = "proto3";
    
    package mprpc;
    
    message RpcHeader
    {
        bytes service_name = 1;
        bytes method_name = 2;
        uint32 args_size = 3;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    OnMessage的实现

    该方法表示已建立连接用户的读写事件操作,如果有一个远程RPC服务的调用请求,那么OnMessage方法就会响应。

    void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr& conn,
                                muduo::net::Buffer* buffer,
                                muduo::Timestamp);
    
    • 1
    • 2
    • 3
    1. 首先要从网络上接收的远程rpc调用请求的字符流
    std::string recv_buf = buffer->retrieveAllAsString();
    
    • 1
    1. 从字符流中读取前4个字节的内容,上面说到,前四个字节表示header_size,有人问头部只有四字节够吗? 原因是我们将头部的大小转换成二进制存到这四字节里,不可能会超出范围。
     // 从字符流中读取前4个字节的内容
        uint32_t header_size = 0;
        recv_buf.copy((char*)&header_size, 4, 0);
    
    • 1
    • 2
    • 3

    这里用到了std::string的copy方法:
    在这里插入图片描述

    1. 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
        std::string rpc_header_str = recv_buf.substr(4, header_size);
        mprpc::RpcHeader rpcHeader;
        std::string service_name;
        std::string method_name;
        uint32_t args_size;
        if (rpcHeader.ParseFromString(rpc_header_str))
        {
            // 数据头反序列化成功
            service_name = rpcHeader.service_name();
            method_name = rpcHeader.method_name();
            args_size = rpcHeader.args_size();
        }
        else
        {
            // 数据头反序列化失败
            std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
            return;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置。 顺便打印一下调试信息。
    	// 获取rpc方法参数的字符流数据
        std::string args_str = recv_buf.substr(4 + header_size, args_size);
    
        // 打印调试信息
        std::cout << "============================================" << std::endl;
        std::cout << "header_size: " << header_size << std::endl;
        std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
        std::cout << "service_name: " << service_name << std::endl;
        std::cout << "method_name: " << method_name << std::endl;
        std::cout << "args_str: " << args_str.c_str() << std::endl;
        std::cout << "============================================" << std::endl;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 获取service对象和method对象
      在之前设计过这样一个表:
    	// 存储注册成功的服务对象和其服务方法的所有信息
        std::unordered_map<std::string, ServiceInfo> m_serviceMap;
        
        // service服务类型信息
        struct ServiceInfo
        {
            google::protobuf::Service* m_service; // 保存服务对象
            std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap; // 保存服务方法
        };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    根据上面表的对应关系,先对其进行判空,然后获取service对象和method对象

    	auto it = m_serviceMap.find(service_name);
        if (it == m_serviceMap.end())
        {
            std::cout << service_name << " is not exist!" << std::endl;
            return;
        }
    	auto mit = it->second.m_methodMap.find(method_name);
        if (mit == it->second.m_methodMap.end())
        {
            std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
            return;
        }
    
    	google::protobuf::Service *service = it->second.m_service; // 获取service对象  new UserService
        const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象  Login
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1. 生成rpc方法调用的请求request和响应response参数
    	
        google::protobuf::Message *request = service->GetRequestPrototype(method).New();
        if (!request->ParseFromString(args_str))
        {
            std::cout << "request parse error, content:" << args_str << std::endl;
            return;
        }
        google::protobuf::Message *response = service->GetResponsePrototype(method).New();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
      new UserService().Login(controller, request, response, done)
    // Closure的回调操作,用于序列化RPC的响应和网络发送
    void SendRpcResponse(const muduo::net::TcpConnectionPtr&, google::protobuf::Message*);
    
    • 1
    • 2
    // Closure的回调操作,用于序列化rpc的响应和网络发送
    void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response)
    {
        std::string response_str;
        if (response->SerializeToString(&response_str)) // response进行序列化
        {
            // 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
            conn->send(response_str);
        }
        else
        {
            std::cout << "serialize response_str error!" << std::endl;
        }
        conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    	// 给下面的method方法的调用,绑定一个Closure的回调函数
        google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
                                                                        const muduo::net::TcpConnectionPtr&,
                                                                        google::protobuf::Message*>
                                                                        (this,
                                                                        &RpcProvider::SendRpcResponse,
                                                                        conn, response);
    
        // 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
        service->CallMethod(method, nullptr, request, response, done);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    Vue集成Echarts
    AIGC|一文揭秘如何利用MYSCALE实现高效图像搜索?
    Spring Boot项目如何打成war包部署呢?
    Linux 深入理解Linux文件系统与日志分析
    Kubernetes(k8s)的Pod控制器DaemonSet详细讲解
    ChatGPT的ABAP能力如何?
    MVP-2:使用MVP分页加载数据
    从头开始——重新布置渗透测试环境的过程记录(From Windows To Mac)
    中国青年报APP设备注册
    mysql 索引优化
  • 原文地址:https://blog.csdn.net/m0_56257585/article/details/126724430