• 轻量级RPC分布式网络通信框架设计——muduo网络部分解析


    01 背景

    (1)服务器通常各个模块对硬件的需求不一样,比如登录服务模块就是I/O密集型的,对内存的需求比较高。有的部分对计算能力的要求比较高,那就是CPU密集型,需要CPU性能强的机器。分布式很好的解决了这个问题,不同的模块可以部署在不同的机器上。

    (2)如果所有的模块部署在一个服务器上他的整体的编译过程也会比较麻烦

    (3)分布式也提高了服务器的并发能力

    02 网络传输

    本项目主要为了像调用本地方法一样调用远端服务器的服务方法,那么网络传输必不可少。

    网络传输部分主要体现在:

    客户端把序列化后的RPC请求字节流数据发送到rpc服务器端,也需要把服务器端的RPC响应的数据在序列化后传输到客户端

    03 网络部分代码分析

    客户端mprpcchannel

        //使用tcp编程, 完成rpc方法的远程调用   rpc客户端发送  无需高并发要求
        int clientfd = socket(AF_INET, SOCK_STREAM, 0);
        if (-1 == clientfd) {
            char errtext[512] = {0};
            sprintf(errtext, "create socket error! errno : %d", errno);
            controller->SetFailed(errtext);
            return;
        }
    
        //读取配置文件rpcserver的信息
        // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
        // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
        //rpc调用方想要调用service_name的method_name服务 需要查询zk上该服务所在的host信息
        ZkClient zkCli;
        zkCli.Start();
    
        //  /UserServiceRpc/Login
        std::string method_path = "/" + service_name + "/" + method_name;
        std::string host_data = zkCli.GetData(method_path.c_str());
    
        if (host_data == "")  {
            controller->SetFailed(method_path + " is not exist!");
            return;
        }
    
        // 127.0.0.1:8000
        int idx = host_data.find(":");
        if (idx == -1) {
            controller->SetFailed(method_path + "address is invalid!");
            return;
        }
    
        
        std::string ip = host_data.substr(0, idx);
        uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
    
        struct sockaddr_in server_addr;
        server_addr.sin_family = AF_INET;  //设置地址家族
        server_addr.sin_port = htons(port);   //port 的 本地字节序转化为网络字节序
    
        server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    
        // 连接rpc服务节点
        if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr))) {
            close(clientfd);
    
            char errtext[512] = {0};
            sprintf(errtext, "connect error! errno :  %d", errno);
            controller->SetFailed(errtext);
            return;  
        }
    
        //发送rpc请求
        if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)) {
            close(clientfd);
    
            char errtext[512] = {0};
            sprintf(errtext, "send error! errno :  %d", errno);
            controller->SetFailed(errtext);
            return;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    客户端采用的传统的socket编程

    对request进行字节流序列化解析(3 个header_size rpc_header_str (service_name method_name args_size) args_str) =》zk根据服务和方法解析获得ip 和 端口号 =》socket =》 设置端口号、ip =》connect 连接rpc服务节点=》 send =》recv =》close

    服务器端rpcprovvider

    // 启动rpc服务节点,开始提供rpc远程网络调用服务
    void RpcProvider::Run() {
    
        // 读取配置文件rpcserver的信息
        std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
        uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
        muduo::net::InetAddress address(ip, port);
    
        //创建Tcpserver对象
        muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");
    
        //绑定连接回调和消息读写回调方法(有没有新的用户连接和用户的读写事件)  很好的分离了网络代码和业务代码   
        server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));  //设置连接回调
        server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
    
        //设置muduo库的线程数量  设置为多线程时  一个线程专门做IO线程  其他都作为工作线程
        server.setThreadNum(4);  // 设置多线程为4  一个IO线程  三个worker线程   Reactor网络模型
    
        // 把当前的rpc节点上要发布的服务全部注册到zk上面 让rpc client可以从zk上发现服务
        // session timeout  30s  心跳时间  zkclient API提供心跳机制    网络I/O线程  1/3 timeout时间发送ping消息   告诉znode节点依然存在
        ZkClient zkCli;
        zkCli.Start();
    
        //service_name为永久性节点  method_name为临时性节点
        for (auto &sp : m_serviceMap) {
            //service_name    /UserServiceRpc
            std::string service_path = "/" + sp.first;   // 创建节点 FriendServiceRpc节点   /FriendServiceRpc 永久性节点  或者 UserServiceRpc
            zkCli.Create(service_path.c_str(), nullptr, 0);
    
            for (auto &mp : sp.second.m_methodMap) {
                //   service_name/method_name    /UserServiceRpc/Login   存储当前这个rpc服务节点主机的ip和port   
                std::string method_path = service_path + "/" + mp.first;
                char method_path_data[128] = {0};
                sprintf(method_path_data, "%s:%d", ip.c_str(), port); // ip : port
                // ZOO_EPHEMERAL表示znode是一个临时节点   zkserver断开即销毁
                zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);  
            }
        }
        
    
        std::cout << "RpcProvider start service at ip: " << ip << " port: " << port << std::endl;  //等待远程连接
    
        //启动网络服务
        server.start();
        m_eventLoop.loop(); //相当于 epoll wait 以阻塞方式等待连接
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.组合TcpServer对象
    2.创建EventLoop事件循环对象的指针,相当于创建了epoll
    3.明确TcpServer构造函数需要什么参数,输出TcpServer的构造函数(三个参数 事件循环eventloop ip和port 服务器名字)
    4.在当前服务器类的构造函数当中,注册处理连接的回调函数和处理读写事件的回调函数(两个回调 连接回调 消息回调)
    5.设置合适的服务端线程数量,muduo库会自己分配I/O线程和worker线程

    简而言之就是以下几步

    //  1 
    muduo::net::TcpServer server;
    
    // 2
    muduo::net::EventLoop m_eventLoop;
    
    // 3
    muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider"); // 事件循环    muduo封装好的,绑定IP+Port  给TCPserver一个名字 
    
    // 4   给服务器注册用户连接的创建和断开的回调,回调就是对端的相应事件发生了告诉网络库 ,然后网络库告诉我 ,我在回调函数开发业务 
    server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));  //设置连接回调
    server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
    
    // 5
    server.setThreadNum(4);  // 设置多线程为4  一个IO线程  三个worker线程   Reactor网络模型   555
    
    // 6
    server.start();  //listenfd通过 epoll_ctl 添加到 epoll 
    m_eventLoop.loop(); //相当于 epoll wait 以阻塞方式等待连接已连接用户的读写事件等
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    04 muduo架构

    muduo网络库采用的时multiple reactor + threadpool的形式,所谓的multiple reactor,就是指有主从reactor之分,Main Reactor只用于监听新的连接,在accept之后就会将这个连接分配到Sub Reactor上,由子Reactor负责连接的事件处理。

    而线程池中维护了两个队列,一个队伍队列,一个线程队列,外部线程将任务添加到任务队列中,如果线程队列非空,则会唤醒其中一只线程进行任务的处理,相当于是生产者和消费者模型。

    image-20221103202751470

    image-20221103202806292

    05 muduo源码解析

    • muduo采用了Linux比较新的系统调用 timerfd 和 eventfd
    • muduo是静态链接的c++程序
    -lmuduo_net   -lmuduo_base
    
    • 1

    ├── muduo
    │ ├── base
    │ └── net

    muduo中的base是一些基础库,都是用户可见的类

    • muduo擅长的领域 tcp长连接 并不比其他开源网络库差

    06 bind绑定器

    C++的回调技术

    仿函数简介

    std::function包含于头文件 #include中,可将各种可调用实体进行封装统一,包括

    1.普通函数

    2.lambda表达式

    3.函数指针

    4.仿函数(functor 重载括号运算符实现)

    5.类成员函数

    6.静态成员函数

    #include 
    #include  
     
    using namespace std; 
     
    std::function<bool(int, int)> fun;
     
    //1.普通函数
    bool compare_com(int a, int b)
    {
        return a > b;
    }
     
    //2.lambda表达式
    auto compare_lambda = [](int a, int b){ return a > b;};
     
    //3.仿函数
    class compare_class
    {
    public:
        bool operator()(int a, int b)
        {
            return a > b;
        }
    };
     
    //4.类成员函数(动态、静态)
    class compare
    {
    public:
        bool compare_member(int a, int b)
        {
            return a > b;
        }
     
        static bool compare_static_member(int a, int b)
        {
            return a > b;
        }
     
    };
     
    //对应的main函数如下:
    int main()
    {
     
        bool result;
     
        fun = compare_com;
        result = fun(10, 1);
        cout << "普通函数输出, result is " << result << endl;
     
     
        fun = compare_lambda;
        result = fun(10, 1);
        cout << "lambda表达式输出, result is " << result << endl;
     
     
        fun = compare_class();
        result = fun(10, 1);
        cout << "仿函数输出, result is " << result << endl;
     
     
        fun = compare::compare_static_member;
        result = fun(10, 1);
        cout << "类静态成员函数输出, result is " << result << endl;
     
     
        compare temp;
        fun = std::bind(&compare::compare_member, temp, std::placeholders::_1, std::placeholders::_2);
        result = fun(10, 1);
        cout << "类普通成员函数输出, result is " << result << endl;
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    bind绑定器

    std::bind函数将可调用对象(开头所述6类)和可调用对象的参数进行绑定,返回新的可调用对象(std::function类型,参数列表可能改变),返回的新的std::function可调用对象的参数列表根据bind函数实参中std::placeholders::_x从小到大对应的参数确定。

    #include 
    #include 
     
     
    using namespace std;
     
    struct Int
    {
        int a;
    };
     
     
     
    bool compare_com(struct Int a, float b)
    {
        return a.a > b;
    }
     
     
     
    int main()
    {
        Int a = {1};
     
        //placeholders::_1对应float, placeholders::_2对应struct Int所以返回值fun的类型为 function
        std::function<bool(float, struct Int)> fun = bind(compare_com, placeholders::_2, placeholders::_1);
     
        bool result = fun(2.0, a);
        cout << "result is " << result << endl;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    把原先 compare的第二个参数绑定到了第一个,第一个参数绑定到了第二个 所以返回fun的类型是 bool(float, struct Int)

    functional和bind的回调技术

    C++的类成员函数不能像普通函数那样用于回调,因为每个成员函数都需要有一个对象实例去调用它。

    通常情况下,要实现成员函数作为回调函数:

    一种过去常用的方法就是把该成员函数设计为静态成员函数(因为类的成员函数需要隐含的this指针 而回调函数没有办法提供),但这样做有一个缺点,就是会破坏类的结构性,因为静态成员函数只能访问该类的静态成员变量和静态成员函数,不能访问非静态的,要解决这个问题,可以把对象实例的指针或引用做为参数传给它。后面就可以靠这个对象实例的指针或引用访问非静态成员函数。

    另一种办法就是使用std::bind和std::function结合实现回调技术。

    下面的所有讨论基于对象。

    #include
    #include
     
    typedef std::function<void()> Functor;
     
    class Blas
    {
    public:
    	virtual void add(int a, int b)
    	{
    		std::cout << a + b << std::endl;
    	}
     
    	static void addStatic(int a, int b)
    	{
    		std::cout << a + b << "this Blas" << std::endl;
    	}
     
    };
     
    class BBlas : public Blas
    {
    	virtual void add(int a, int b)
    	{
    		std::cout << a + b << "this BBlas" <<std::endl;
    	}
    };
     
     
    int main()
    {
    	//Blas blas;
    	BBlas blas;
     
    	//使用bind绑定类静态成员函数
    	Functor functor(std::bind(&Blas::addStatic, 1, 2));
    	functor();
     
     
    	//使用bind绑定类的chengyuan函数
    	Functor functor2(std::bind(&Blas::add, blas, 1, 2));
    	functor2();
     
    	return 0;
    }
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    结果

    3this Blas

    3this BBlas

    上述代码中的区别是:如果不是类的静态成员函数,需要在参数绑定时,往绑定的参数列表中加入使用的对象

    #include
    #include
     
    typedef std::function<void()> Functor;
     
    class Blas
    {
        public:
            void setCallBack(const Functor& cb)
            {functor = cb;};
     
            void printFunctor()
            {functor();};
     
        private:
            Functor functor;
     
    };
     
     
    class Atlas
    {
     
        public:
            Atlas(int x_) : x(x_)
            {
     
                //使用当前类的静态成员函数
                blas.setCallBack(std::bind(&addStatic,x,2));
     
                //使用当前类的非静态成员函数 
                blas.setCallBack(std::bind(&Atlas::add,this,x,2));
     
            } 
     
            void print()
            {
                blas.printFunctor();
            }
     
        private:
            void add(int a,int b)
            {
                std::cout << a+b << std::endl;
            }
     
            static void addStatic(int a,int b)
            {
                std::cout << a+b << std::endl;
            }
     
             Blas blas;
     
             int x;
     
    };
     
     
     
    int main(int argc,char** argv)
    {
     
        Atlas atlas(5);
     
        atlas.print();
     
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    两个函数在Atlas类中,并且可以自由操作Atlas的数据成员。尽管是将add()系列的函数封装成函数对象传入Blas中,并且在Blas类中调用,但是它们仍然具有操作Atlas数据成员的功能,在两个类之间形成了弱的耦合作用。但是如果要在两个类之间形成弱的耦合作用,必须在使用std::bind()封装时,向其中传入this指针:

    C++中 直接调用、函数指针、std::function效率对比

    07 线程数量设置

    image-20221103212721748

  • 相关阅读:
    多队列网卡与虚拟化
    JS逆向之浏览器补环境详解
    植物大战僵尸各种僵尸攻略(四)
    ArcGis课程设计
    大数据学习笔记第1课 Hadoop基础理论与集群搭建
    mysql导入CSV乱码问题解决
    空气传导和骨传导耳机哪个好?这两种耳机有什么区别?
    Filter/过滤器基本使用
    TCP协议之《Pacing功能》
    NSS [HNCTF 2022 WEEK2]ez_SSTI
  • 原文地址:https://blog.csdn.net/qq_41945053/article/details/127721873