• 【IO多路转接】poll&epoll



    1 🍑poll🍑

    1.1 🍎poll函数接口🍎

    #include 
    int poll(struct pollfd *fds, nfds_t nfds, int timeout);
    // pollfd结构
    struct pollfd 
    {
     int fd; /* file descriptor */
     short events; /* requested events */
     short revents; /* returned events */
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参数说明:

    • fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合;
    • nfds表示fds数组的长度;
    • timeout表示poll函数的超时时间, 单位是毫秒(ms)

    eventsrevents的取值:

    事件描述是否可作为输入是否可作为输出
    POLLIN普通数据和优先数据可读
    POLLEDNORM普通数据可读
    POLLEDBAND优先级带数据可读(Linux不支持)
    POLLOUT普通数据和优先数据可写
    POLLWRNORM普通数据可写
    POLLWRBAND优先级带数据可写
    POLLERR错误
    POLLHUP挂起,比如管道的写端关闭后,读端描述符上将收到POLLHUP事件
    POLLNVAL文件描述符没有打开

    返回结果

    • 返回值小于0, 表示出错;
    • 返回值等于0, 表示poll函数等待超时;
    • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回;

    1.2 🍎poll接口的使用🍎

    通过对poll接口的介绍后大家不难发现,其实使用poll接口是比用select是更简单的,因为在之前我们写select服务器时我们需要自己来维护一个fd数组帮助我们将位图结构初始化,但是使用poll就不用了,我们只需要创建一个struct pollfd*结构的指针,动态开辟空间即可。

    代码实例:

    #include "Sock.hpp"
    #include 
    #include 
    #include
    #include
    
    using namespace std;
    const int N = 1024;
    const int default_fd = -1;
    const short default_event=0;
    const uint16_t gport=8866;
    class PollServer
    {
    public:
        PollServer(const uint16_t port=gport)
        :_port(port)
        ,_ppd(nullptr)
        {}
        ~PollServer()
        {
            _listensock.Close();
            delete []_ppd;
            _ppd=nullptr;
        }
    
        void init()
        {
            _listensock.Socket();
            _listensock.Bind(_port);
            _listensock.Listen();
    
            _ppd=new pollfd[N];
            for(int i=0; i<N; ++i)
            {
                _ppd[i].fd=default_fd;
                _ppd[i].events=default_event;
                _ppd[i].revents=default_event;
            }
    
        }
    
        void run()
        {
            _ppd[0].fd=_listensock.Fd();
            _ppd[0].events=POLLIN;
            while(true)
            {
                //int timeout=-1;
                int n=poll(_ppd, N, -1); 
                if (n > 0)
                {
                    cout << "有一个就绪事件发生了" << endl;
                    // 表示已经有n个连接到来了,此时我们能够直接accept吗?
                    hand_event();
                    printf_fd();
                }
                else if (n == 0)
                {
                    cout << "time out" << endl;
                }
                else
                {
                    cout << "select errno:" << errno << ":" << strerror(errno) << endl;
                }
            }
        }
    private:
        void accepter()
        {
            string clientip;
            uint16_t clientport;
            int sock = _listensock.Accept(&clientip, &clientport);
            cout << "[ip:port]:" << clientip << ":" << clientport << endl;
            int pos = 1;
            while (pos < N)
            {
                if (_ppd[pos].fd == default_fd)
                {
                    _ppd[pos].fd = sock;
                    _ppd[pos].events=POLLIN;
                    _ppd[pos].revents=default_event;
                    break;
                }
                ++pos;
            }
            if (pos > N)
            {
                cout << "_fdarr full" << endl;
                close(sock);
            }
        }
    
        void serverio(int fd, int i)
        {
            char buffer[1024];
            ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
            if (n < 0)
            {
                cout << "read fail" << endl;
                return;
            }
            else if (n == 0)
            {
                cout << "client close,me too" << endl;
                close(fd);
                _ppd[i].fd=default_fd;
                _ppd[i].events=default_event;
                _ppd[i].revents=default_event;
            }
            else
            {
                buffer[n - 1] = 0;
                cout << "client:" << buffer << endl;
                string echo = buffer;
                echo += " [poll server echo]";
                write(fd, echo.c_str(), echo.size());
            }
        }
    
        void hand_event()
        {
            for (int i = 0; i < N; ++i)
            {
                int fd=_ppd[i].fd;
                short revent=_ppd[i].revents;
                if(fd == default_fd)
                    continue;
                if (revent & POLLIN)
                {
                    if (fd == _listensock.Fd())
                    {
                        accepter();
                    }
                    else
                    {
                        serverio(fd, i);
                    }
                }
                else if(revent & POLLOUT)
                {
                    cout<<"POLLOUT"<<endl;
                }
    
            }
        }
    
        void printf_fd()
        {
            for (int i = 0; i < N; ++i)
            {
                if (_ppd[i].fd != default_fd)
                    cout << _ppd[i].fd << " ";
            }
            cout<<endl;
        }
    private:
        Sock _listensock;
        uint16_t _port;
        pollfd* _ppd;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160

    注意:此时在进行serverio时也会有粘包问题以及write的fd没有交给poll处理(写实事件并不一定就绪)问题。

    验证:
    在这里插入图片描述

    1.3 🍎poll的优缺点🍎

    poll优点
    不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现:

    • pollfd结构包含了要监视的event和已经就绪的revent,不再使用select手动设置fd集合的方式,接口使用比select更方便;
    • poll并没有最大数量限制 (但是数量过大后性能也是会下降);

    poll缺点

    poll中监听的文件描述符数目增多时:

    • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符;
    • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中;
    • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

    从本质上来说,poll知识解决了select文件描述符个数限制问题,但是select其他缺点poll并没有解决,那么还有更好的方式来解决吗?这时就引出来了一个更为厉害的转接方式:epoll


    2 🍑epoll🍑

    2.1 🍎epoll函数接口🍎

    2.1.1 🍋epoll_create🍋

    int epoll_create(int size);
    
    • 1

    创建一个epoll的句柄:

    • 自从linux2.6.8之后,size参数是被忽略的(只要随便设置一个>0的数字就行)
    • 用完之后, 必须调用close()关闭;

    2.1.2 🍋epoll_ctl🍋

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
    • 1

    epoll的事件注册函数:

    • 它不同于select是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型;
    • 第一个参数是epoll_create()的返回值(epoll的句柄);
    • 第二个参数表示动作,用三个宏来表示;
    • 第三个参数是需要监听的fd;
    • 第四个参数是告诉内核需要监听什么事件;

    op参数的取值:

    • EPOLL_CTL_ADD :注册新的fd到epfd中;
    • EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
    • EPOLL_CTL_DEL :从epfd中删除一个fd;

    struct epoll_event结构如下:
    在这里插入图片描述

    events可以是以下几个宏的集合:

    • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
    • EPOLLOUT : 表示对应的文件描述符可以写;
    • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
    • EPOLLERR : 表示对应的文件描述符发生错误;
    • EPOLLHUP : 表示对应的文件描述符被挂断;
    • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的;
    • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里;

    2.1.3 🍋epoll_wait🍋

    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
    
    • 1

    收集在epoll监控的事件中已经就绪事件。

    • 参数events是分配好的epoll_event结构体数组;
    • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存);
    • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size;
    • timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞);
    • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败;

    2.2 🍎epoll工作原理🍎

    • 1️⃣当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:
    struct eventpoll
    { 
     .... 
     /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
     struct rb_root rbr; 
     /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
     struct list_head rdlist; 
     .... 
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 2️⃣每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)
      而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。在epoll中,对于每一个事件,都会建立一个epitem结构体。
    struct epitem
    { 
     struct rb_node rbn;//红黑树节点 
     struct list_head rdllink;//双向链表节点 
     struct epoll_filefd ffd; //事件句柄信息 
     struct eventpoll *ep; //指向其所属的eventpoll对象 
     struct epoll_event event; //期待发生的事件类型 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 3️⃣当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可,如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)

    在这里插入图片描述总结一下, epoll的使用过程就是三部曲:

    • 调用epoll_create创建一个epoll句柄;
    • 调用epoll_ctl, 将要监控的文件描述符进行注册;
    • 调用epoll_wait, 等待文件描述符就绪;

    2.3 🍎epoll的优点🍎

    • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开;
    • 数据拷贝轻量: 只在合适的时候调用 epoll_ctl() 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝);
    • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响;
    • 没有数量限制: 文件描述符数目无上限;(IO效率不随fd数目增加而线性下降)

    网上有些博客说, epoll中使用了内存映射机制(内核直接将就绪队列通过mmap的方式映射到用户态,避免了拷贝内存这样的额外性能开销);这种说法是不准确的,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的。

    2.4 🍎epoll接口的使用🍎

    2.4.1 🍋第一版本的epoll🍋

    为了更加方便使用 epoll接口,便封装了一个类专门处理epoll接口:

    #include "Sock.hpp"
    #include 
    #include 
    #include
    #include
    using namespace std;
    
    
    const int default_sz=132;
    const int default_epfd=-1;
    class Epoller
    {
    public:
        Epoller()
        :_epfd(default_epfd)
        {}
        ~Epoller()
        {
            if(_epfd != default_epfd)
                close(_epfd);
        }
        void create()
        {
            _epfd=epoll_create(default_sz);
            if(_epfd < 0)
            {
                cout<<"epoll_create fail"<<endl;
            }
        }
    
        bool add_event(int fd, uint32_t events)
        {
            epoll_event ev;
            ev.data.fd=fd;
            ev.events=events;
            int n=epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
            if(n < 0)
            {
                cout<<"add_event fail"<<endl;
                return false;
            }
            return true;
        }
        bool mod_event(int fd, uint32_t events)
        {
            epoll_event ev;
            ev.data.fd=fd;
            ev.events=events;
            int n=epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
            if(n < 0)
            {
                cout<<"mod_event fail"<<endl;
                return false;
            }
            return true;
        }
        bool del_event(int fd)
        {
            int n=epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            if(n < 0)
            {
                cout<<"del_event fail"<<endl;
                return false;
            }
            return true;
        }
    
        int ep_wait(epoll_event* revents, int max_events, int timeout)
        {
            return epoll_wait(_epfd, revents, max_events, timeout);
        }
    
        int get_epfd()
        {
            return _epfd;
        }
    private:
        int _epfd;
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    epoll服务器的编写:

    const uint16_t g_port=8899;
    const int max_epollrevent_sz=64;
    class EpollServer
    {
    public:
        EpollServer(uint16_t port=g_port)
        :_port(port)
        {}
        ~EpollServer()
        {
            _listensock.Close();
        }
    
        void init()
        {
            _listensock.Socket();
            _listensock.Bind(_port);
            _listensock.Listen();
    
            _epoller.create();
        }
        void run()
        {
            _epoller.add_event(_listensock.Fd(), EPOLLIN);
            int timeout=-1;//以阻塞方式进行等待
            while(true)
            {
                int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
                if(n < 0)
                {
                    cout<<"ep_wait fail"<<endl;
                }
                else if(n == 0)
                {
                    cout<<"timeout"<<endl;
                }
                else
                {
                    cout<<"当前已经有"<<n<<"个事件就绪"<<endl;
                    hander_event(n);
                }
            }
        }
    private:
        void hander_event(int n)//由于只有n个事件就绪,所以我们只需要遍历0~n即可
        {
            for(int i=0; i<n; ++i)
            {
                int fd=_rvents[i].data.fd;
                uint32_t revent=_rvents->events;
                if (revent & EPOLLIN)//读事件就绪时
                {
                    if (fd == _listensock.Fd())
                    {
                        accepter(); // 进行accept获取新连接
                    }
                    else
                    {
                        serverio(fd); // 用于进行数据io
                    }
                }
            }
        }
        void accepter()
        {
            string clientip;
            uint16_t clientport;
            int sock=_listensock.Accept(&clientip, &clientport);
            cout<<"【"<<clientip<<","<<clientport<<"】事件已经就绪,fd:"<<sock<<endl;
            //将新连接添加到_epoller
            _epoller.add_event(sock, EPOLLIN);
        }
        void serverio(int fd)
        {
            char buffer[1024];
            ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
            if (n < 0)
            {
                cout << "read fail" << endl;
            }
            else if (n == 0)
            {
                cout << "client close,me too" << endl;
                close(fd);
                _epoller.del_event(fd);
            }
            else
            {
                buffer[n - 1] = 0;
                cout << "client:" << buffer << endl;
                string echo = buffer;
                echo += " [epoll server echo]";
                send(fd, echo.c_str(), echo.size(), 0);
            }
        }
    
    private:
        uint16_t _port;
        Sock _listensock;
        Epoller _epoller;
        epoll_event _rvents[max_epollrevent_sz];
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    代码中需要注意的地方:

    • 1️⃣此时在进行serverio时也会有粘包问题以及写事件并不一定就绪的问题(这个我们在第二版本会讲解处理方式)。
    • 2️⃣在select/poll编程中,在读取消息时当对端已经把连接关闭时都会修改数组(select中是数组,而poll中是指针),目的都是让内核不要在关心该事件了,epoll也是同理,不同的是调用del_event将不在关心的事件删除而已。

    2.4.2 🍋epoll工作方式🍋

    epoll有2种工作方式:水平触发(LT)和边缘触发(ET)
    我们来举一个生活中小栗子来帮助更好的理解这两种方式:

    比如你正在打游戏,你的妈妈喊你吃饭,这时她通知你的方式可能有下面两种:

    1. 每隔一段时间通知你一次,直到你来吃饭为止;(LT)
    2. 只通知你一次,后面就不管你了;(ET)

    看一个实际例子:

    • 我们已经把一个tcp socket添加到epoll描述符 ;
    • 这个时候socket的另一端被写入了2KB的数据;
    • 调用epoll_wait,并且它会返回,说明它已经准备好读取操作;
    • 然后调用read, 只读取了1KB的数据;
    • 继续调用epoll_wait…

    水平触发Level Triggered 工作模式

    • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理,或者只处理一部分;
    • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪;
    • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回;
    • 支持阻塞读写和非阻塞读写;

    注意:epoll默认状态下就是LT工作模式.

    边缘触发Edge Triggered工作模式

    如果我们将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。

    • 当epoll检测到socket上事件就绪时, 必须立刻处理;
    • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了;
    • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会;
    • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多),Nginx默认采用ET模式使用epoll;
    • 只支持非阻塞的读写;

    2.4.3 🍋对比LT和ET🍋

    LT是 epoll 的默认行为,使用 ET 能够减少 epoll 触发的次数,但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。这也就表明ET的代码复杂程度更高。

    相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些,但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是差不多的。

    2.4.4 🍋第二个版本的epoll🍋

    在开始写代码前,我们增加一个Connection的结构体,它的主要成员如下:
    在这里插入图片描述为什么要设计这么一个结构体呢?我们知道,使用ET模式就要求程序员将缓冲区的数据一次性全部取走,所以为了简便就使用了_inbuffer_outbuffer这两个缓冲区来处理读到的数据以及要发送的数据。另外该成员中还存在3个回调函数,这样我们只需要不断的监视就绪的事件中_events,就能够回调不同的处理方式。
    除此之外,我们还得在EpollServer类中再增加一个成员变量:
    在这里插入图片描述
    使用fdConnection建立唯一映射关系,当我们监听到一个新连接到来时,就将新连接添加到_conns中管理,所以我们接下来便可以完善代码了:

    const uint16_t g_port=8899;
    const int max_epollrevent_sz=64;
    const int g_num=8848;
    
    
    class Connection;
    class EpollServer;
    using func_t=std::function<void (Connection*, const protocol_ns::Request&)>;
    using callbact_t=std::function<void(Connection*)>;
    
    class Connection
    {
    public:
        Connection(int fd, string ip, uint16_t port)
        :_fd(fd)
        ,_ip(ip)
        ,_port(port)
        {}
        void resgister(callbact_t calb_read, callbact_t calb_write, callbact_t calb_excep)
        {
            _calb_read=calb_read;
            _calb_write=calb_write;
            _calb_excep=calb_excep;
        }
        //用于进行数据IO
        int _fd;
        string _inbuffer;
        string _outbuffer;
        //用户信息
        string _ip;
        uint16_t _port;
        //用户关心的事件
        uint32_t _events;
        //IO处理函数
        callbact_t _calb_read;
        callbact_t _calb_write;
        callbact_t _calb_excep;
    };
    class EpollServer
    {
    public:
        EpollServer(func_t func, uint16_t port=g_port)
        :_func(func)
        ,_port(port)
        {}
        ~EpollServer()
        {
            _listensock.Close();
        }
    
        void init()
        {
            _listensock.Socket();
            _listensock.Bind(_port);
            _listensock.Listen();
    
            _epoller.create();
            add_connection(_listensock.Fd(), EPOLLIN | EPOLLET);//将_listensock添加到_conns中
            cout<<"EpollServer init success"<<endl;
        }
    
    
        void loop_once(int timeout)
        {
            int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
            for(int i=0; i<n; ++i)
            {
                int fd=_rvents[i].data.fd;
                uint32_t events=_rvents[i].events;
                cout<<n<<":"<<i<<endl;
                cout<<"正在处理"<<fd<<"事件上的"<<((events & EPOLLIN) ? "EPOLLIN" : "OTHER")<<endl;
    
                //将所有的异常情况,最后全部转化成为read,write的异常
                if ((events & EPOLLERR) || (events & EPOLLHUP))
                    events |= (EPOLLIN | EPOLLOUT);
    
                if((events & EPOLLIN) && conn_isexist(fd))
                    _conns[fd]->_calb_read(_conns[fd]);//如果是读事件就执行读事件的回调
                else if((events & EPOLLOUT) && conn_isexist(fd))
                    _conns[fd]->_calb_write(_conns[fd]);//如果是写事件就执行写事件的回调
            }
        }
        void run()
        {
           int timeout=-1;
           while(true)
           {
                loop_once(timeout);
           }
        }
    
    private:
        void add_connection(int fd, uint32_t events, string ip="127.0.0.1", uint16_t port=g_port)
        {
            //将fd设置为非阻塞,保证ET模式下不会一直卡在wait
            Util::SetNonBlock(fd);
            //构建Connection对象,交给_conns管理
            Connection* con=new Connection(fd, ip, port);
            if(fd == _listensock.Fd())
                con->resgister(std::bind(&EpollServer::accepter, this, std::placeholders::_1), nullptr, nullptr);
            else
                con->resgister(std::bind(&EpollServer::reader, this, std::placeholders::_1),
                std::bind(&EpollServer::writer, this, std::placeholders::_1),
                std::bind(&EpollServer::excepter, this, std::placeholders::_1));
            con->_events=events;
            _conns.insert({fd, con});
            //将事件写到内核中
            bool r=_epoller.add_event(fd, events);
            cout<<"_conns insert success,fd:"<<fd<<",ip:"<<ip<<",port"<<port<<endl;
        }
    
        void accepter(Connection* conn)
        {
        }
        void reader(Connection* conn)
        {
        }
        void writer(Connection* conn)
        {
        }
        void excepter(Connection* conn)
        {
        }
        bool conn_isexist(int fd)
        {
            return _conns.find(fd) != _conns.end();
        }
    
    private:
        uint16_t _port;
        Sock _listensock;
        Epoller _epoller;
        epoll_event _rvents[max_epollrevent_sz];//就绪的响应事件
        unordered_map<int, Connection*> _conns;//使用fd与Connection*建立映射关系
        func_t _func;//用于执行上层传入的回调
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136

    代码中需要注意的地方:

    • 1️⃣:在添加连接时,首先将fd设置为非阻塞,保证了read/write一定能够把数据读取完毕/发送完毕;
    • 2️⃣:在添加连接的时候,由于参数不匹配,所以使用了bind来调整参数个数:
      在这里插入图片描述
    • 3️⃣:在进行数据IO时,我们分成了readerwriter

    现在的重点是如何实现accepter/reader/writer:
    在实现前,我们就必须考虑协议定制的问题了,要读取或者发送一个完整的报文,我们之前实现网络版本计算器时已经实现过一次协议定制,所以此时直接拿来用即可。
    先来实现acceper:

        void accepter(Connection* conn)
        {
            do
            {
                string clientip;
                uint16_t clientport;
                int err=0;
                int sock = _listensock.Accept(&clientip, &clientport, err);
                if (sock > 0)
                {
                    cout << "【" << clientip << "," << clientport << "】事件已经就绪,fd:" << sock << endl;
                    // 将新连接添加到_conns中管理
                    add_connection(sock, EPOLLIN | EPOLLET, clientip, clientport);
                }
                else
                {
                    if(err == EAGAIN || err == EWOULDBLOCK)
                        break;
                    else if(err == EINTR)
                        continue;
                    else
                        cout<<"accept fail"<<endl;
                }
            } while (conn->_events & EPOLLET);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    代码中需要循环获取新的连接,当新连接到来时就将新连接添加到_conns中管理即可。

    再来实现reader:

        void hander_requset(Connection* conn)
        {
            bool quit = false;
            while (!quit)
            {
                string requestStr;
                // ParsePackage函数作用是将_inbuffer中取出一个完整报文并将其写入到requestStr中
                // 返回值==0表示没有一个完整报文;>0表示一个完整报文的长度
                int t = protocol_ns::ParsePackage(conn->_inbuffer, &requestStr);
                if (t > 0)
                {
                    // 去除报头
                    requestStr = protocol_ns::RemoveHeader(requestStr, t);
                    // 进行反序列化
                    protocol_ns::Request req;
                    req.Deserialize(requestStr);
                    // 执行回调进行处理
                    protocol_ns::Response resp=_func(req);
                    //序列化
                    string responseStr;
                    resp.Serialize(&responseStr);
                    //添加报头
                    responseStr=protocol_ns::AddHeader(responseStr);
                    //将数据写到发送缓冲区中
                    conn->_outbuffer+=responseStr;
                }
                else
                    quit=true;
            }
        }
        bool reader_hander(Connection* conn)
        {
            bool res=true;
            do
            {
                char buffer[g_num]={0};
                int n=read(conn->_fd, buffer, sizeof(buffer)-1);
                if(n > 0)
                {
                    buffer[n]=0;
                    conn->_inbuffer+=buffer;
                }
                else if (n == 0)
                {
                    conn->_calb_excep(conn);
                    break;
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;//表示已经将数据全部读取完毕,跳出循环
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        conn->_calb_excep(conn);
                        res=false;
                        break;
                    }
                }
            }while(conn->_events & EPOLLET);
            return res;
        }
        void reader(Connection* conn)
        {
            if(!reader_hander(conn))
                return;
            //处理request,返回response
            hander_requset(conn);
            //一般我们在面对写入的时候,直接写入,没写完才交给epoll
            if(!conn->_outbuffer.empty())
                conn->_calb_write(conn);//如果发送缓冲区不为空,就进行一次writer
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    上面代码中比较重要的地方都标有注释。
    处理reader的方式是先将所有的数据全部读到_inbuffer中,然后再根据协议进行处理,将处理好的数据放进_outbuffer中,最后判断一下_outbuffer是否为空,不为空的话就手动调用写事件的回调函数进行处理。

    在进行写事件处理时我们要明白一件事:读事件需要一直关心,因为你需要一直监听是否有新的socket到来,但是写事件其实就不需要了,一直关心反而占用CPU资源,只有当_outbuffer不为空时我们才去关心。

    writer的编写:

        bool enable_read_write(Connection *conn, bool read, bool write)
        {
            conn->_events = ((read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET);
            return _epoller.mod_event(conn->_fd, conn->_events);
        }
        void writer(Connection *conn)
        {
            {
                bool safe = true;
                do
                {
                    ssize_t n = write(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
                    if (n > 0)
                    {
                        conn->_outbuffer.erase(0, n);
                        if (conn->_outbuffer.empty())
                            break;
                    }
                    else
                    {
                        if (errno == EAGAIN || errno == EWOULDBLOCK)
                        {
                            break;
                        }
                        else if (errno == EINTR)
                            continue;
                        else
                        {
                            safe = false;
                            conn->_calb_excep(conn);
                            break;
                        }
                    }
                } while (conn->_events & EPOLLET);
                if (!safe)
                    return;
                if (!conn->_outbuffer.empty())
                    enable_read_write(conn, true, true);
                else
                    enable_read_write(conn, true, false);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    最后excepter的编写就很简单了:

        void excepter(Connection *conn)
        {
            // 1. 先从epoll移除fd
            _epoller.del_event(conn->_fd);
    
            // 2. 从_conns中移除fd
            _conns.erase(conn->_fd);
    
            // 3. 关闭fd
            close(conn->_fd);
    
            // 5. 释放conn对象
            delete conn;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.4.5 🍋Reactor🍋

    首先来回答什么是Reactor?
    Reactor是基于多路转接包含事件派发器,连接管理器的半同步半异步的IO服务器。
    其实我们实现的第二个版本的epoll就是一个简易版本的Reactor(但是没有加上异步处理)。异步就是使用多进程/多线程的方式让事件处理交给另外一个进程/线程处理,防止当前业务进程/业务线程阻塞而导致整个业务无法处理。

    如果使用多线程的方式我们可以使用下面这种方式将之前的代码进行优化:
    在这里插入图片描述
    这样一个sock就对应这一个线程来处理。

    多进程方式:

    • 我们可以使用管道来处理,父进程负责获得listensock,子进程将管道的读端添加进epoll中,由于子进程继承了父进程的listensock(不将listensock添加进Reactor),所以让子进程自己accept获得sock然后添加进Reactor,父进程可以使用轮询的方式随机挑选子进程向管道写入数据;
    • 除了使用管道外,还可以多进程加锁竞争的方式来进行。
  • 相关阅读:
    抖音获得抖音商品详情 API
    Mac配置nvm包管理
    javascript转ArrayBuffer为字符串
    【SQL 语言艺术】数据库三范式
    单臂路由的详细配置步骤
    Dapr v1.9.0 版本已发布
    土巴兔上市再折戟,互联网家装没故事
    【华为OD机试真题 python】 免单统计【2022 Q4 | 100分】
    3.Node-事件循环用法
    为什么我的k8s的master节点和node节点的状态都是notready呢
  • 原文地址:https://blog.csdn.net/m0_68872612/article/details/134091597