• Linux 高级IO


    五种IO模型

    1. 阻塞IO

    在内核将数据准备好之前, 系统调用会一直等待。 所有的套接字, 默认都是阻塞方式。

    阻塞IO是最常见的IO模型

    在这里插入图片描述

    1. 非阻塞IO

    如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码。

    非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用。

    在这里插入图片描述

    1. 信号驱动IO

    内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作。

    在这里插入图片描述

    1. IO多路转接

    虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态

    在这里插入图片描述

    1. 异步IO

    由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。

    在这里插入图片描述

    小结: 任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少。

    高级IO重要概念

    什么叫做高效的IO?

    IO=等待+数据拷贝,而高效的IO就是在整个周期内,等的比重特别少,一直在做拷贝。提高IO效率就是减少IO过程等待的比重。

    同步通信 vs 异步通信

    同步和异步关注的是消息通信机制

    • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果。
    • 异步则是相反, 调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后, 被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

    另外, 我们回忆在讲多进程多线程的时候, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不相干的概念

    • 进程/线程同步也是进程/线程之间直接的制约关系。
    • 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候。

    同学们以后在看到 “同步” 这个词, 一定要先搞清楚大背景是什么. 这个同步, 是同步通信异步通信的同步, 还是同步与互斥的同步。

    阻塞 vs 非阻塞

    阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

    • 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
    • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程

    其他高级IO

    非阻塞IO,纪录锁,系统V流机制, I/O多路转接(也叫I/O多路复用) ,readv和writev函数以及存储映射IO(mmap),这些统称为高级IO。

    非阻塞IO

    fcntl

    一个文件描述符, 默认都是阻塞IO

    在这里插入图片描述

    函数原型:

    int fcntl(int fd,int cmd,.../* arg */ );
    
    • 1

    传入的cmd的值不同, 后面追加的参数也不相同

    fcntl函数有5种功能

    1. 复制一个现有的描述符(cmd=F_DUPFD)
    2. 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
    3. 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
    4. 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
    5. 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

    我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞

    实现函数SetNoBlock

    基于fcntl, 我们实现一个SetNoBlock函数, 将文件描述符设置为非阻塞

    void SetNonBlock(int fd)
    {
        int f1=fcntl(fd,F_GETFL);
        if(f1<0){
            std::cerr<<"获取文件标记位失败..."<<std::endl;
            return;
        }
    
        fcntl(fd,F_SETFL,f1|O_NONBLOCK);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)
    • 然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数

    轮询方式读取标准输入

    实现代码

    #include<iostream>
    #include<string>
    #include<unistd.h> 
    #include<fcntl.h>
    #include<errno.h>
    
    void SetNonBlock(int fd)
    {
        int f1=fcntl(fd,F_GETFL);
        if(f1<0){
            std::cerr<<"获取文件标记位失败..."<<std::endl;
            return;
        }
    
        fcntl(fd,F_SETFL,f1|O_NONBLOCK);
    }
    
    
    int main()
    {
        char buffer[1024];
        SetNonBlock(0);
        while(true){
            ssize_t s=read(0,buffer,sizeof(buffer)-1);
            if(s>0){
                buffer[s]=0;
                std::cout<<"buffer: "<<buffer<<std::endl;
            }
            else{
                if(errno==EAGAIN || errno == EWOULDBLOCK)
                {
                    sleep(2);
                    std::cout << "当前没有出错,仅仅底层数据没有就绪罢了..." << std::endl;
                    continue;
    
                }
                if(errno == EINTR){
                    std::cout << "读取被信号中断" << std::endl;
                    continue;
                }
    
                std::cout<<"read error"<<std::endl;
                break;
            }
        }
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    运行结果

    在这里插入图片描述

    I/O多路转接之select

    初识select

    系统提供select函数来实现多路复用输入/输出模型

    • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
    • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

    select函数

    #include <sys/select.h>
    
    int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
    
    • 1
    • 2
    • 3

    参数解释

    • 参数nfds是需要监视的最大的文件描述符值+1
    • rdset 对应于需要检测的可读文件描述符的集合
    • wrset 对应于需要检测的可写文件描述符的集合
    • exset 对应于需要检测的异常文件描述符的集合
    • 参数 timeout 为结构 timeval ,用来设置select()的等待时间

    关于timeval结构

    timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0。

    在这里插入图片描述

    参数timeout取值

    • NULL:则表示select()没有timeout, select将一直被阻塞,直到某个文件描述符上发生了事件
    • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生
    • 特定的时间值:如果在指定的时间段里没有事件发生, select将超时返回

    fd_set结构

    在这里插入图片描述
    在这里插入图片描述

    fd_set是操作系统提供的文件描述符集,其这个结构就是一个整型数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符。

    在这里插入图片描述

    fd_set是一个输入输出型函数,它的功能如下

    • 所有关心读事件的文件描述符,都应该添加在这个集合中。
    • 输入:用户告诉内核,OS你要帮我检测一下在这个集合中的fd的读事件。
    • 输出:内核告诉用户,你关心的fd,有那些文件描述符已经就绪了,可以读取了!

    fd_set的接口

    void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
    int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
    void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
    void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
    
    • 1
    • 2
    • 3
    • 4

    函数返回值

    • 执行成功则返回文件描述符状态已就绪的个数
    • 如果返回0代表在描述词状态改变前已超过timeout时间
    • 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds, writefds, exceptfds和timeout的值变成不可预测

    错误值可能为:

    • EBADF 文件描述词为无效的或该文件已关闭
    • EINTR 此调用被信号所中断
    • EINVAL 参数n为负值
    • ENOMEM 核心内存不足

    常见的程序片段如下

    fs_set readset;
    FD_SET(fd,&readset);
    select(fd+1,&readset,NULL,NULL,NULL);
    if(FD_ISSET(fd,readset)){......}
    
    • 1
    • 2
    • 3
    • 4

    理解select执行过程

    理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd

    1. 执行 fd_set set; FD_ZERO(&set) ;则set用位表示是0000,0000。
    2. 若 fd=5 ,执行 FD_SET(fd,&set) ;后set变为0001,0000(第5位置为1)
    3. 若再加入fd=2,fd=1,则set变为0001,0011.
    4. 执行 select(6,&set,0,0,0) 阻塞等待
    5. 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生时fd=5被清空

    socket就绪条件

    读就绪

    • socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0。
    • socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0
    • 监听的socket上有新的连接请求
    • socket上有未处理的错误

    写就绪

    • socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0
    • socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号
    • socket使用非阻塞connect连接成功或失败之后
    • socket上有未读取的错误

    select的特点

    • 可监控的文件描述符个数取决与sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096
    • 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd。一是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断;二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数

    select缺点

    • 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说非常不便
    • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
    • select支持的文件描述符数量太小

    select使用示例

    套接字封装 sock.hpp

    #pragma once
    
    #include<iostream>
    #include<string>
    #include<cstdlib>
    #include<unistd.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<strings.h>
    
    namespace ns_sock
    {
        enum{
            SOCKET_ERR=2,
            BIND_ERR,
            LISTEN_ERR
        };
    
        const int g_backlog=5;
    
        class Sock  
        {
        public:
            static int Socket()
            {
                int sock=socket(AF_INET,SOCK_STREAM,0);
                if(sock<0){
                    std::cerr<<"socket error!"<<std::endl;
                    exit(SOCKET_ERR);
                }
                int opt = 1;
                setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// 设置套接口选项值
                return sock;
            }
    
            static void Bind(const int &sock,const u_int16_t &port)
            {
                struct sockaddr_in local;
                bzero(&local,sizeof(local));
                local.sin_family=AF_INET;
                local.sin_port=htons(port);
                local.sin_addr.s_addr=INADDR_ANY;
    
                if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
                {
                    std::cerr<<"bind error!"<<std::endl;
                    exit(BIND_ERR);
                }
            }
            
            static void Listen(const int &sock)
            {
                if(listen(sock,g_backlog)<0)
                {
                    std::cerr<<"bind error!"<<std::endl;
                    exit(LISTEN_ERR);               
                }
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    select 服务端头文件 select_server.hpp

    #pragma once
    
    #include"sock.hpp"
    #include<sys/select.h>
    #include<sys/types.h>
    #include<sys/socket.h>
    
    namespace ns_select
    {
        using namespace ns_sock;
    
    #define NUM (sizeof(fd_set)*8)
    
        const int g_default = 8080;
    
        class SelectServer
        {
        private:
            u_int16_t port_;
            int listen_sock_;
    
            int fd_arrar_[NUM];
    
            // EndPoint fd_arrar_[NUM];
        public:
            SelectServer(int port=g_default):port_(port),listen_sock_(-1)
            {
                for(int i=0;i<NUM;i++)
                {
                    fd_arrar_[i]=-1;
                }
            }
    
            void InitSelectServer()
            {
                listen_sock_=Sock::Socket();
                Sock::Bind(listen_sock_,port_);
                Sock::Listen(listen_sock_);
                fd_arrar_[0]=listen_sock_;
            }
    
    	    std::ostream& PrintFd()
            {
                for(int i=0;i<NUM;i++)
                {
                    if(fd_arrar_[i]!=-1) std::cout<<fd_arrar_[i]<<' ';
                }
    
                return std::cout;
            }
    
            void HandlerEvent(fd_set &rfds)
            {
                //判断我的有效sock,是否在rfds中
                for(int i=0;i<NUM;i++)
                {
                    if(-1==fd_arrar_[i]){
                        continue;
                    }
                    //区分新链接和数据
                    if(FD_ISSET(fd_arrar_[i],&rfds))// 测试描述词组set中相关fd的位是否为真
                    {
                        if(fd_arrar_[i]==listen_sock_)
                        {
                            //新链接
                            struct sockaddr_in peer;
                            socklen_t len=sizeof(peer);
                            int sock=accept(listen_sock_,(struct sockaddr*)&peer,&len);
                            if(sock<0)
                            {
                                std::cout<<"accept error"<<std::endl;
                            }
                            else
                            {
                                //将新的sock添加到文件描述符中
                                int j=0;
                                for(;j<NUM;j++)
                                {
                                    if(fd_arrar_[j]==-1)
                                    {
                                        break;
                                    }
                                }
    
                                if(j==NUM)
                                {
                                    std::cout<<"fd_arrar 已经满了"<<std::endl;
                                    close(sock);
                                }
                                else{
                                    fd_arrar_[j]=sock;
                                    std::cout<<"获取链接成功,sock: "<<sock<<" 已经添加到数组中了,当前:"<<std::endl;
                                    PrintFd() << " [当前]" << std::endl;
                                }
                            }
                        }
                        else
                        {
                            //数据
                            // 1.网络通信,定制协议,和业务场景有关
                            // 2.是不是每一个sock,都必须有自己独立的buffer
                            char buffer[1024];
                            ssize_t s=recv(fd_arrar_[i],buffer,sizeof(buffer),0);
                            if(s>0)
                            {
                                buffer[s]='\0';
                                std::cout<<"clint say#"<<buffer<<std::endl;
                            }
                            else if(s==0)
                            {
                                std::cout<<"client quit------sock: "<<fd_arrar_[i]<<std::endl;
                                
                                
                                // 对端链接关闭
                                close(fd_arrar_[i]);
    
                                // 从rfds中,去掉sock
                                fd_arrar_[i]=-1;
                                PrintFd()<<"[当前]"<<std::endl;
                            }
                            else{
                                // 读取异常
                                std::cerr<<"recv error"<<std::endl;
                            }
                        }
                    }
                }
            }
    
            void Loop()
            {
                //在服务器最开始的时候,我们只有一个sock,listen_sock, 有读事件就绪,读文件描述符看待的!
                fd_set rfds;           
                
                //FD_SET(listen_sock_,&rfds);
                while(true)
                {
                    //struct timeval timeout={3,0};
                    // 对位图结构进行清空
                    FD_ZERO(&rfds); 
                    int max_fd=-1;
    
                    for(int i=0;i<NUM;i++)
                    {
                        if(-1==fd_arrar_[i]) continue;
                        FD_SET(fd_arrar_[i],&rfds);
                        if(max_fd<fd_arrar_[i]) max_fd=fd_arrar_[i];
                    }
    
                    int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);
                    switch(n)
                    {
                        case 0:
                            std::cout<<"timeout ... "<<std::endl;
                            break;
                        case -1:
                            std::cout<<"select error"<<std::endl;
                            break;
                        default:
                            // select成功,至少有一个fd是就绪的
                            HandlerEvent(rfds);
                            //select成功,至少有一个fd是就绪的
                            //std::cout<<"有事件发生"<<std::endl;
                            break;
                    }
                }
            }
    
            ~SelectServer()
            {
                if(listen_sock_>=0)
                    close(listen_sock_);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175

    select 服务端文件

    #include"select_server.hpp"
    
    using namespace ns_select;
    int main()
    {
        SelectServer* svr=new SelectServer();
        svr->InitSelectServer();
        svr->Loop();
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    运行结果

    在这里插入图片描述

    I/O多路转接之poll

    poll函数接口

    #include <poll.h>
    
    int poll(struct pollfd *fds, nfds_t nfds, int timeout);
    
    // pollfd结构
    struct pollfd {
    	int fd; /* file descriptor */
    	short events; /* requested events */  // 用户通知内核,你要帮我关心fd,上面的所有的events事件
    	short revents; /* returned events */  // 内核通知用户,底层fd,对应的那些事件都已经就绪了
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    参数说明

    • fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合
    • nfds表示fds数组的长度
    • timeout表示poll函数的超时时间, 单位是毫秒(ms)

    events和revents的取值

    在这里插入图片描述
    返回结果

    • 返回值小于0, 表示出错
    • 返回值等于0, 表示poll函数等待超时
    • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回

    socket就绪条件

    socket就绪条件同select

    poll的优点

    不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现

    • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便
    • poll并没有最大数量限制 (但是数量过大后性能也是会下降)

    poll的缺点

    poll中监听的文件描述符数目增多时

    • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符
    • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中
    • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降

    select :只负责等待(nums),就绪事件通知机制
    poll:只负责等待(nums),就绪事件通知机制

    poll :
    1.解决了:检测的文件描述符有上限的问题
    2解决了︰将输入和输出分离,解决编码的时候,必须的重新设置关心的文件描述符,poll不在需要每次都重新设置

    我们编写代码的时候,编译器是如何得知我的代码有语法报错的呢??
    因为我们在写代码的时候,编辑器(vscode) or集成环境(vs2019 vs2022),会自动在后台调动编译器,用编译的语法检测功能,来进行对你正在写的代码进行扫描。自动补齐,语法提示,也是类似的道理。
    编辑器(vscode)or集成环境(vs2019 vs2022)也会在你写代码的时候,在特定的路径下进行搜索头文件,通过头文件的内容来给我们进行语法提示

    poll使用示例

    poll 服务端头文件 poll_server.hpp

    #pragma once
    
    #include"sock.hpp"
    #include<poll.h>
    #include<sys/types.h>
    #include<sys/socket.h>
    
    namespace ns_poll
    {
        using namespace ns_sock;
    
        const int g_default = 8080;
    
        #define NUM 1024
    
        class PollServer
        {
        private:
            u_int16_t port_;
            int listen_sock_;
    
            struct pollfd pollfds_[NUM];
        public:
            PollServer(int port=g_default):port_(port),listen_sock_(-1)
            {
                for(int i=0;i<NUM;i++)
                {
                    pollfds_[i].fd=-1;
                    pollfds_[i].events=0;
                    pollfds_[i].revents=0;
                }
            }
    
            void InitPollServer()
            {
                listen_sock_=Sock::Socket();
                Sock::Bind(listen_sock_,port_);
                Sock::Listen(listen_sock_);
    
                pollfds_[0].fd=listen_sock_;
                pollfds_[0].events=POLLIN;
            }
    
            std::ostream& PrintFd()
            {
                return std::cout;
            }
    
            void HandlerEvent(struct pollfd pollfds[],int num)
            {
                for(int i=0;i<num;i++)
                {
                    if(pollfds[i].fd==-1) continue;
                    if(pollfds[i].revents & POLLIN)
                    {
                        if(pollfds[i].fd==listen_sock_)
                        {
                            //新链接
                            struct sockaddr_in peer;
                            socklen_t len=sizeof(peer);
                            int sock=accept(listen_sock_,(struct sockaddr*)&peer,&len);
                            if(sock<0)
                            {
                                std::cout<<"accept error"<<std::endl;
                            }
                            else
                            {
                                int j=0;
                                for(;j<num;j++)
                                {
                                    if(pollfds[j].fd==-1)
                                    {
                                        break;
                                    }
                                }
                                
                                if(j==num)
                                {
                                    close(sock);
                                }
                                else
                                {
                                    std::cout<<"get a new sock : "<<sock<<std::endl;
                                    pollfds[j].fd=sock;
                                    pollfds[j].events=POLLIN;
                                    pollfds[j].revents=0;
                                    close(pollfds[i].fd);
                                }
                            } 
                        } 
                        else
                        {
                            char buffer[1024];
                            ssize_t s=recv(pollfds[i].fd,buffer,1024,0);
                            if(s>0){
                                buffer[s]=0;
                                std::cout<<"client say# "<<buffer<<std::endl;
                            }
                            else if(s<=0)
                            {
                                std::cout<<"client quit ... sock : "<<pollfds[i].fd<<std::endl;
                                pollfds[i].fd=-1;
                                pollfds[i].events=0;
                                pollfds[i].revents=0;
                            }
                        }                 
                    }
                }
            }
    
            void Loop()
            {
                int timeout = 1000;// 1s
                while(true)
                {
                    int n=poll(pollfds_,NUM,timeout);
                    switch(n)
                    {
                        case 0:
                            //std::cout<<"timeout ... "<<std::endl;
                            break;
                        case -1:
                            std::cout<<"poll error"<<std::endl;
                            break;
                        default:
                            // select成功,至少有一个fd是就绪的
                            HandlerEvent(pollfds_,NUM);
                            //select成功,至少有一个fd是就绪的
                            //std::cout<<"有事件发生"<<std::endl;
                            break;
                    }
                }
            }
    
            ~PollServer()
            {
                if(listen_sock_>=0)
                    close(listen_sock_);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    服务端文件 server.cc

    #include"poll_server.hpp"
    
    using namespace ns_poll;
    int main()
    {
        PollServer* svr=new PollServer();
        svr->InitPollServer();
        svr->Loop();
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    运行结果

    在这里插入图片描述

    I/O多路转接之epoll

    epoll的相关系统调用

    epoll 有3个相关的系统调用

    epoll_create

    函数原型

    int epoll_create(int size);
    
    • 1

    功能: 创建一个epoll的句柄

    说明:

    • 自从linux2.6.8之后,size参数是被忽略的.
    • 用完之后, 必须调用close()关闭

    epoll_ctl

    函数原型

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
    • 1

    功能: epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型。

    参数:

    • 第一个参数是epoll_create()的返回值(epoll的句柄)
    • 第二个参数表示动作,用三个宏来表示
    • 第三个参数是需要监听的fd
    • 第四个参数是告诉内核需要监听什么事件

    第二个参数的取值

    • EPOLL_CTL_ADD :注册新的fd到epfd中
    • EPOLL_CTL_MOD :修改已经注册的fd的监听事件
    • EPOLL_CTL_DEL :从epfd中删除一个fd

    struct epoll_event结构如下

    在这里插入图片描述

    events可以是以下几个宏的集合

    • EPOLLIN:表示对应的文件描述符可以读
    • EPOLLOUT:表示对应的文件描述符可以写
    • EPOLLPRI:表示对应的文件描述符有紧急的数可读
    • EPOLLERR:表示对应的文件描述符发生错误
    • EPOLLHUP:表示对应的文件描述符被挂断
    • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
    • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

    epoll_wait

    函数原型

    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
    
    • 1

    功能: 收集在epoll监控的事件中已经发送的事件

    参数:

    • 参数events是分配好的epoll_event结构体数组
    • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)
    • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
    • 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)

    返回值:

    如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败.

    epoll工作原理

    在这里插入图片描述

    struct eventpoll{ 
    	 .... 
    	 /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
    	 struct rb_root rbr; 
    	 /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
    	 struct list_head rdlist; 
    	 .... 
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
    • 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)
    • 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法
    • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中
    • 在epoll中,对于每一个事件,都会建立一个epitem结构体
    struct epitem{ 
    	struct rb_node rbn;//红黑树节点 
    	struct list_head rdllink;//双向链表节点 
    	struct epoll_filefd ffd; //事件句柄信息 
    	struct eventpoll *ep; //指向其所属的eventpoll对象 
    	struct epoll_event event; //期待发生的事件类型 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
    • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)

    总结一下, epoll的使用过程就是三部曲

    • 调用epoll_create创建一个epoll句柄
    • 调用epoll_ctl, 将要监控的文件描述符进行注册
    • 调用epoll_wait, 等待文件描述符就绪

    epoll的优点(和 select 的缺点对应)

    • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
    • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
    • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响
    • 没有数量限制: 文件描述符数目无上限

    注意:

    网上有这样的说法, epoll中使用了内存映射机制(内核直接将就绪队列通过mmap的方式映射到用户态. 避免了拷贝内存这样的额外性能开销)

    这种说法是不准确的. 我们定义的struct epoll_event是我们在用户空间中分配好的内存. 势必还是需要将内核的数据拷贝到这个用户空间的内存中的

    epoll工作方式

    epoll有2种工作方式-水平触发(LT)和边缘触发(ET)

    假如有这样一个例子:

    • 我们已经把一个tcp socket添加到epoll描述符
    • 这个时候socket的另一端被写入了2KB的数据
    • 调用epoll_wait,并且它会返回. 说明它已经准备好读取操作
    • 然后调用read, 只读取了1KB的数据
    • 继续调用epoll_wait…

    水平触发Level Triggered 工作模式

    epoll默认状态下就是LT工作模式

    • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
    • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪
    • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回
    • 支持阻塞读写和非阻塞读写

    边缘触发Edge Triggered工作模式

    如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

    • 当epoll检测到socket上事件就绪时, 必须立刻处理
    • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了
    • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
    • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll
    • 只支持非阻塞的读写

    select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET

    对比LT和ET

    • LT是 epoll 的默认行为。使用 ET 能够减少 epoll 触发的次数. 但是需要一次响应就绪过程中就把所有的数据都处理完。相当于一个文件描述符就绪之后, 不会反复被提示就绪。
    • ET看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的
    • ET 的代码复杂程度更高

    理解ET模式和非阻塞文件描述符

    使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 “工程实践” 上的要求

    假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个10k请求

    在这里插入图片描述

    如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来, 可能被信号打断), 剩下的9k数据就会待在缓冲区中

    在这里插入图片描述
    此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回

    但是这样就存在如下问题

    • 服务器只读到1k个数据, 要10k读完才会给客户端返回响应数据
    • 客户端要读到服务器的响应, 才会发送下一个请求
    • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据

    在这里插入图片描述

    所以, 为了解决上述问题(阻塞read不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来。

    如果是LT没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪

    epoll的使用场景

    epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反

    对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll

    例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll.

    如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适. 具体要根据需求和场景特点来决定使用哪种IO模型

    使用示例

    利用epoll实现一个简单的英汉互译

    Accepter.hpp 文件

    #pragma once
    
    #include<iostream>
    #include<string>
    #include<cerrno>
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include"Reactor.hpp"
    #include"Callback.hpp"
    
    using namespace ns_reactor;
    using namespace ns_sock;
    
    void Accepter(Event &event)
    {
        std::cout << "Accepter 回调方法被调用" << std::endl;
        //连接事件到来,在同一个时间段,有很多的连续到来
        while(true)
        {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int sock=accept(event.sock_,(struct sockaddr*)&peer,&len);
            if(sock>0)
            {
                //0.设置fd为非阻塞
                Sock::SetNonBlock(sock);
                //1.构建新的与sock对应的Event对象
                Event ev;
                ev.sock_=sock;
                ev.r_=event.r_;
                ev.RegisterCallback(Recver,Sender,Errorer);
    
                //2.将新的ev拖管给Epoll,必须知道曾今的Epoll对象
                (event.r_)->AddEvent(ev,EPOLLIN | EPOLLET);
            }
            else
            {
                if(errno==EINTR)
                {
                     //当前的accept调用,被信号中断,并不代表底层没有新的链接了
                     continue;
                }
                else if(errno==EAGAIN || errno==EWOULDBLOCK)
                {
                    //当前出错返回,但是不是真正意义上出错了,而是底层没有连接了
                    break;
                }
                else
                {
                    //真正意义上面的accept读取出错
                    std::cerr<<"accept error : "<<errno<<std::endl;
                    continue;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    Callback.hpp 文件

    #pragma once
    
    #include<iostream>
    #include<string>
    #include<sys/types.h>
    #include<sys/socket.h>
    #include<unordered_map>
    
    #include"Reactor.hpp"
    #include"Util.hpp"
    
    std::unordered_map<std::string,std::string> dict{
        {"hello" , "你好"},
        {"apple" , "苹果"},
        {"pear", "梨"},
        {"banana", "香蕉"},
        {"peach","桃"}
    };
    
    using namespace ns_reactor;
    
    /***********************************
    * return :
    * 0 本轮读取完毕
    * -1 本轮读取出错
    *
    * sock : 要读取的fd
    * out : 输出型参数
    *
    ******************************/
    static int RecvHepler(int sock,std::string *out)
    {
        while(true)
        {
            char buffer[1024];
            ssize_t s = recv(sock,buffer,sizeof(buffer)-1,0);
            if(s > 0){
                buffer[s] = 0;
                std::string str;
                for(int i=0;i<s;i++){
                    if((buffer[i]>='a'&&buffer[i]<='z')||(buffer[i]>='A'&&buffer[i]<='Z'))
                    {
                        str+=buffer[i];
                    }
                }
                (*out) = str;
                // (*out) += buffer;
            }
            else if(s < 0){
                if(errno == EINTR)// 读取时出现中断错误
                {
                    continue;
                }
                else if(errno == EAGAIN || errno == EWOULDBLOCK)// 没有数据可读
                {
                    // 本轮读取完毕
                    return 0;
                }
                else
                {
                    // 读取出错
                    return -1;
                }
            }
            else{
                return -1;
            }
        }
    }
    
    // 1.网络就绪事件与事件派发,和网络数据进行解耦
    void Recver(Event &event)
    {
        // 你怎么知道你本轮读完了?依旧属于网络通信部分(是否就绪(epoll),是否读取(recv))
        if( -1 == RecvHepler(event.sock_,&(event.inbuffer_)))// 不做业务处理,只负责进行读取
        {
            // 本轮读取出错(sock被关闭了,sock读取出错)
            if(event.error_callback_) event.error_callback_(event);
            return ; 
        }
    
        // 往后写的内容已经与Reactor无关了,都是数据分析与处理
        // 协议:你怎么知道你拿到了一个完整的报文?我们不知道,因为我们没有定制协议
        // 但是,所有的数据全部已经在inbuffer中(无论是现在,还是未来)
        // 协议:如果读取数据在协议层面,没有读完完整报文,应该如何?
    
        std::string message = "词库中没有该单词";
        std::cout<<event.inbuffer_.c_str()<<std::endl;
        auto iter = dict.find(event.inbuffer_);
        if(iter!=dict.end()){
            message=iter->second;
        }
    
        event.outbuffer_ = message+'\n';
        (event.r_)->EnableReadWrite(event.sock_,true,true);
    
        std::cout<<"call Recver..."<<std::endl;
    }
    
    int SendHelper(int sock,std::string &send_string)
    {
        int total = 0;//目前累计发送多少
        const char* start = send_string.c_str();
        int size = send_string.size();
    
        while(true)
        {
            ssize_t s = send(sock,start+total,size-total,0);
            if(s>0)
            {
                total+=s;
                // 本轮缓冲区足够大,数据已全部发送
                if(total == size){
                    return 1;
                }
            }
            else
            {
                if(errno == EINTR)
                {
                    continue;
                }
                else if(errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 发送缓冲区已满
    
                    return 0;// 本轮发送完毕
                }
                else
                {
                    return -1;// 发送失败
                }
            }
        }
    }
    
    void Sender(Event &event)
    {
        // 本质是发送outbuffer_中的内容
        int ret = SendHelper(event.sock_,event.outbuffer_);
    
        if(ret == -1)
        {
            if(event.error_callback_)
                event.error_callback_(event);
        }
        else if(ret == 1)
        {
            (event.r_)->EnableReadWrite(event.sock_,true,false);
        }
        else if(ret == 0)
        {
            (event.r_)->EnableReadWrite(event.sock_,true,true);
        }
        std::cout<<"call Sender..."<<std::endl;
    }
    
    void Errorer(Event &event)
    {
        std::cout<<"call Errorer..."<<std::endl;
        (event.r_)->DelEvent(event.sock_);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162

    Reactor.hpp 文件

    #pragma once
    
    #include<iostream>
    #include<string>
    #include<vector>
    #include<unordered_map>
    #include<sys/epoll.h>
    #include<unistd.h>
    
    namespace ns_reactor
    {
        class Event;
        class Reactor;
    
        typedef void(*callback_t)(Event &);
    
        class Event
        {
        public:
            int sock_;                   //特定一个文件描述符
            Reactor *r_;                   //指向该Event对应的epoll
    
            std::string inbuffer_;       //对应的sock,私有的读取缓冲区
            std::string outbuffer_;      //对应的sock,私有的发送缓冲区
    
            callback_t recv_callback_;   //对应的sock,读回调
            callback_t send_callback_;   //对应的sock,写回调
            callback_t error_callback_;  //对应的sock,异常回调
    
    
        public:
            Event():sock_(-1),r_(nullptr)
            {
                recv_callback_ = nullptr;
                send_callback_ = nullptr;
                error_callback_ = nullptr;
            }
            void RegisterCallback(callback_t _recv,callback_t _send,callback_t _error)
            {
                recv_callback_=_recv;
                send_callback_=_send;
                error_callback_=_error;
            }
            ~Event()
            {
            }
        };
    
        class Reactor
        {
        private:
            int epfd_;
            std::unordered_map<int,Event> events_; //sock : Event
        public:
            Reactor():epfd_(-1)
            {}
    
            void InitReactor()
            {
                epfd_=epoll_create(128);
                if(epfd_<0)
                {
                    std::cerr<<"epoll_create error"<<std::endl;
                    exit(1);
                }
            }
    
            void AddEvent(const Event &ev,uint32_t events)
            {
                //1.将ev中的sock添加到epoll中,默认我们认为,所有添加的事件默认全部都要关心read事件
                //2.将ev本身添加到unordered_map
                struct epoll_event epoll_ev;
                epoll_ev.events=events;
                epoll_ev.data.fd=ev.sock_;
    
                if(epoll_ctl(epfd_,EPOLL_CTL_ADD,ev.sock_,&epoll_ev)<0)
                {
                    std::cerr<<"epoll_ctl add event error: "<<ev.sock_<<std::endl;
                    return ;
                }
                else
                {
                    events_.insert({ev.sock_,ev});
                }
                std::cout<<"添加事件成功,sock : "<<ev.sock_<<std::endl;
            }
    
            void DelEvent()
            {
                auto iter = events_.find(sock);
                if(iter == events_.end())
                {
                    return ;
                }
                epoll_ctl(epfd_,EPOLL_CTL_DEL,sock,nullptr);
                events_.erase(iter);
                close(sock);
            }
    
            // 使能读写接口
            void EnableReadWrite(int sock,bool readable,bool writeable)
            {
                struct epoll_event ev;
                ev.events = (EPOLLET | (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));
                ev.data.fd = sock;
    
                if(epoll_ctl(epfd_,EPOLL_CTL_MOD,sock,&ev) == 0){
                    std::cout<<"更改:"<<sock<<" 关心的事件成功"<<std::endl;
                }
            }
            
            // 检测指定sock是否存在
            bool IsExists(int sock)
            {
                auto iter = events_.find(sock);
                return iter == events_.end() ? false : true;
            }
    
            // 对就绪事件进行事件派发
            void Dispatcher(int timeout)
            {
                #define NUM 128
                struct epoll_event revs[NUM];
                int num=epoll_wait(epfd_,revs,NUM,timeout);
                for(int i=0;i<num;i++)
                {
                    //就绪事件派发
                    int sock=revs[i].data.fd;
                    uint32_t events=revs[i].events;
                    std::cout<<"sock: "<<sock<<" 这个fd上面有数据"<<std::endl;
    
                    // 将所有异常都交给读写处理
                    if(events & EPOLLERR) events |= (EPOLLIN|EPOLLOUT);
                    if(events & EPOLLHUP /*对端关闭链接*/) events |= (EPOLLIN|EPOLLOUT);
    
                    if(IsExists(sock) && (events&EPOLLIN))
                        if(events_[sock].recv_callback_) events_[sock].recv_callback_(events_[sock]);
    
                    if(IsExists(sock) && (events&EPOLLOUT))
                        if(events_[sock].send_callback_) events_[sock].send_callback_(events_[sock]);
                }
            }
    
            ~Reactor()
            {
                if(epfd_>=0) close(epfd_);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149

    sock.hpp 文件

    #pragma once
    
    #include<iostream>
    #include<string>
    #include<cstdlib>
    #include<unistd.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<strings.h>
    #include<fcntl.h>
    
    namespace ns_sock
    {
        enum{
            SOCKET_ERR=2,
            BIND_ERR,
            LISTEN_ERR
        };
    
        const int g_backlog=5;
    
        class Sock  
        {
        public:
            static int Socket()
            {
                int sock=socket(AF_INET,SOCK_STREAM,0);
                if(sock<0){
                    std::cerr<<"socket error!"<<std::endl;
                    exit(SOCKET_ERR);
                }
                int opt = 1;
                setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
                return sock;
            }
    
            static void Bind(const int &sock,const u_int16_t &port)
            {
                struct sockaddr_in local;
                bzero(&local,sizeof(local));
                local.sin_family=AF_INET;
                local.sin_port=htons(port);
                local.sin_addr.s_addr=INADDR_ANY;
    
                if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
                {
                    std::cerr<<"bind error!"<<std::endl;
                    exit(BIND_ERR);
                }
            }
            
            static void Listen(const int &sock)
            {
                if(listen(sock,g_backlog)<0)
                {
                    std::cerr<<"bind error!"<<std::endl;
                    exit(LISTEN_ERR);               
                }
            }
    
            static void SetNonBlock(int sock)
            {
                int fl=fcntl(sock,F_GETFL);
                fcntl(sock, F_SETFL , fl | O_NONBLOCK);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    EpollServer.cc 文件

    #include"Reactor.hpp"
    #include"sock.hpp"
    #include"Accepter.hpp"
    
    using namespace ns_reactor;
    using namespace ns_sock;
    
    int main()
    {
        //1.创建Epoll对象
        Reactor* R=new Reactor();
        R->InitReactor();
    
        //2.创建网络套接字
        int listen_sock=Sock::Socket();
        Sock::SetNonBlock(listen_sock);// 为了支持ET模式,需要把文件描述符设置为非阻塞
        Sock::Bind(listen_sock,8080);
        Sock::Listen(listen_sock);
    
        //3.创建Event对象
        Event ev;
        ev.sock_=listen_sock;
        ev.r_=R;
    
        //Accepter:链接管理器
        ev.RegisterCallback(Accepter,nullptr,nullptr);// listen_sock,只需监听就绪
    
        //4.将Event ev注册进入Epoll中
        R->AddEvent(ev,EPOLLIN | EPOLLET);//让sock的工作方式是ET模式
    
        //5.进入事件派发逻辑,服务器启动
        int timeout=1000;
        while(true)
        {
            R->Dispatcher(timeout);
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    运行结果
    在这里插入图片描述

  • 相关阅读:
    图片或文件Blob、File、Base64之间的相互转换
    Hive企业级调优
    LVS,Nginx,Haproxy三种负载均衡产品的对比
    从上半年智能驾驶前装量产数据看市场走势,你的机会在哪里?
    跨境干货 | 如何搭建自己的独立站?
    论文阅读_异常检测综述
    【【萌新的RiscV学习之在写代码之前对于关键路径的分析-11】】
    Python爬虫教程:从入门到实战
    ATF官方文档翻译(二):Authentication Framework & Chain of Trust(身份验证框架和信任链)(2)
    网络安全防御体系中网络安全检查设备
  • 原文地址:https://blog.csdn.net/qq_56663697/article/details/124794782