• Linux网络高级IO select poll epoll 原理及使用


    1. IO模型

    内存和外设的交互叫做IO,网络IO就是将数据在内存和网卡间拷贝。

    IO本质就是等待和拷贝,一般等待耗时往往远高于拷贝耗时。所以提高IO效率就是尽可能减少等待时间的比重。

    IO模型简单对比解释
    阻塞IO阻塞等待数据到来
    非阻塞IO轮询等待数据到来
    信号驱动信号递达时再来读取或写入数据
    多路转接让大批线程等待,自身读取数据
    异步通信让其他进程或线程进行等待和读取,自身获取结果

    1.1 阻塞IO

    执行流在某个文件描述符下读取数据时,执行流一直等待IO条件就绪后读取数据,这就是阻塞IO。

    在这里插入图片描述

    1.2 非阻塞IO

    执行流会以循环的方式反复尝试读取数据,如果IO条件未就绪,执行流会直接返回继续其他任务。

    在这里插入图片描述

    非阻塞读取方式

    可通过fcntl设置文件的状态。

    非阻塞读取时,数据未就绪是以出错的形式返回的,错误码为EAGINEWOULDBLOCK,信号导致读取未成功错误码为EINTR

    void set_nonblock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if (fl < 0) {
            perror("fcntl failed");
            return;
        }
        if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
            perror("fcntl failed");
            return;
        }
    }
    
    int main() {
        set_nonblock(0);
        char buf[64] = {0};
    
        while (true) {
            ssize_t n = read(0, buf, sizeof(buf) - 1);
            if (n > 0)
            {
                buf[n - 1] = 0;
                std::cout << buf << std::endl;
            }
            else if (n == 0)
            {
                perror("end of file");
                break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK) // 非阻塞数据未就绪返回
                    continue;
                else if (errno == EINTR) // IO被信号中断返回
                    continue;
                else 
                {
                    perror("read error");
                    break;
                }
            }
        }
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    较为鸡肋,一般不用。

    1.3 信号驱动

    IO事件就绪时,内核通过SIGIO信号通知进程。等待的过程是异步的,但拷贝数据是同步的,所以我们认为信号驱动也是同步IO。

    在这里插入图片描述

    但信号处理是异步的,所以数据提取可能不及时。

    1.4 多路转接

    内核提供select、poll、epoll等多路转接方案,最高可同时等待几百个文件。拷贝数据的任务仍由进程完成,等待数据的任务交给内核。

    在这里插入图片描述

    1.5 异步通信

    只要自身完全没有参与IO等待和拷贝就是异步通信,否则就是同步。

    将缓冲区提供给异步接口,接口等待并拷贝将数据至缓冲区,最后通知进程。进程不参与IO可直接处理数据,所以是异步的。

    在这里插入图片描述

    异步IO系统提供有一些对应的系统接口,但大多使用复杂,也不建议使用。异步IO也有更好的替代方案。

    IO事件就绪

    IO事件就绪可分为读事件就绪和写事件就绪。

    一般接收缓冲区设有高水位,高于该水位读事件就绪,发送缓冲区设有低水位,低于该水位写事件就绪。

    因为频繁读写内核缓冲区需要状态切换,会附带一系列的处理工作,导致效率下降。

    在这里插入图片描述

     

    2. 多路转接

    Linux下多路转接的方案常见的有三种:select、poll、epoll,select出现是最早的,使用也是最繁琐的。

    2.1 select

    select的接口

    select能够等待多个fd的IO条件是否就绪。

    #include 
    int select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout);
    
    struct timeval {
        time_t       tv_sec;   /* seconds */
        suseconds_t  tv_usec;  /* microseconds */
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    参数解释
    nfdsfd的总个数,select遍历fdset结构的范围(被等待的fd的最大值+1)
    readfds调用时表示需要关注的读事件,返回时表示那些事件已经就绪
    writefds调用时表示需要关注的写事件,返回时表示那些事件已经就绪
    exceptfds调用时表示需要关注的异常事件,返回时表示那些事件已经就绪。如对端关闭,读写异常等
    timeout调用时表示本次调用阻塞等待时间,返回时表示此次返回剩余的等待时间
    返回值大于0表示就绪fd的个数,为0表示本次调用结束,–1表示出错
    fd_set的接口

    fd_set是文件描述符的位图结构,下标表示文件描述符,比特位内容表示是否需要等待。

    在这里插入图片描述

    // fd_set操作函数
    void FD_CLR  (int fd, fd_set *set); // 清除
    int  FD_ISSET(int fd, fd_set *set); // 检测
    void FD_SET  (int fd, fd_set *set); // 设置
    void FD_ZERO (        fd_set *set); // 置零
    
    • 1
    • 2
    • 3
    • 4
    • 5

    select的使用

    const int GPORT = 8080;
    const int GSIZE = 10;
    enum event_type {
        read_event   = 0x1 << 1,
        write_event  = 0x1 << 2,
        except_event = 0x1 << 3,
    };
    struct fd_collection {
        fd_collection() {}
        fd_collection(const fd_collection& fds) {
            _rfds = fds._rfds, _wfds = fds._wfds, _efds = fds._efds, _maxfd = fds._maxfd;
        }
        bool set(int event, int fd) {
            if (_fdarr.size() >= GSIZE) return false;
            if (event & read_event)   _rfds.set(fd);
            if (event & write_event)  _wfds.set(fd);
            if (event & except_event) _wfds.set(fd);
            _fdarr.push_back(fd);
            if (_maxfd < fd) _maxfd = fd;
            return true;
        }
        void clear(int fd) {
            _rfds.clear(fd);
            _wfds.clear(fd);
            _efds.clear(fd);
            for (int i = 0; i < _fdarr.size(); i++)
                if (_fdarr[i] == fd) _fdarr[i] = -1;
        }
        class file_descptrs {
        public:
            file_descptrs() { bzero(); }
            ~file_descptrs() {}
            void set  (int fd) { FD_SET(fd, &_set);          }
            void clear(int fd) { FD_CLR(fd, &_set);          }
            bool isset(int fd) { return FD_ISSET(fd, &_set); }
            void bzero()       { FD_ZERO(&_set);             }
            fd_set* get() { return &_set; }
        private:
            fd_set _set;
        };
        file_descptrs _rfds;
        file_descptrs _wfds;
        file_descptrs _efds;
        std::vector<int> _fdarr;
        int _maxfd = -1;
    };
    
    class select_server : public inet::tcp::server {
    public:
        select_server(uint16_t port) : server(port), _wouldblock(true)
        {}
        select_server(uint16_t port, int sec, int usec) : server(port), _timeout({sec, usec})
        {}
        void start() {
            _fds.set(read_event, _sock);
            while (true) {
                int n = 0;
                struct timeval timeout = _timeout;
                fd_collection fds_cp(_fds);
    
    if (_wouldblock) 
    n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(),  nullptr);
    else             
    n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(), &timeout);
                switch (n) {
                case 0: INFO("time out: %.2f", timeout.tv_sec + timeout.tv_usec / 1.0 / 1000);
                    break;
                case -1: ERROR("select error, %d %s", errno, strerror(errno));
                    break;
                default: handler_event(fds_cp);
                    break;
                }
            }
        }
    private:
        void handler_event(fd_collection& resfds) {
            for (auto fd : _fds._fdarr) {
                if (fd == -1) continue;
                if (resfds._rfds.isset(fd)) {
                    if (fd == _sock)  {
                        acceptor();
                    } else {
                        std::string buf;
                        recver(fd, &buf);
                    }
                }
                if (resfds._wfds.isset(fd)) {
                    std::string msg = "test";
                    sender(fd, msg);
                }
                if (resfds._efds.isset(fd)) {
                    WARN("excepton event occurred, fd: %d", fd);
                }
            }
        }
        void acceptor() {
            std::string cip;
            uint16_t cport;
            int sock = accept(&cip, &cport);
            INFO("a connect %d has been accepted [%s:%d]", sock, cip.c_str(), cport);
            // if (!_fds.set(read_event | write_event | except_event, sock))
            if (!_fds.set(read_event, sock)) {
                close(sock);
                WARN("connect close, fd array is full");
            }
        }
        void recver(int fd, std::string* buf) {
            ssize_t s = recv(fd, buf, 1024);
            if (s > 0) {
                std::cout << *buf << std::endl;
            }
            else {
                if (s == 0) INFO("client quit");
                else WARN("recv error, %d %s", errno, strerror(errno));
                _fds.clear(fd);
                close(fd);
            }
        }
        void sender(int fd, const std::string& msg) {
            size_t s = send(fd, msg);
            if (s <= 0) {
                if (s == 0) INFO("client quit");
                else WARN("send error, %d %s", errno, strerror(errno));
                _fds.clear(fd);
                close(fd);
            }
        }
    private:
        bool _wouldblock;
        struct timeval _timeout;
        fd_collection _fds;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132

    select的优缺点

    优点
    一次等待多个fd,使IO等待时间重叠,一定程度上提高IO效率
    缺点
    调用前要重新设置fd集,调用后要遍历检测就绪fd,需要额外数组
    select能够检测fd的个数上限太小
    频繁地将用户数据拷贝到内核中
    select内部遍历fd_set结构以检测就绪

     

    2.2 poll

    poll相比select在使用和实现上都有进步。不过重点是epoll。

    poll的接口

    #include 
    int poll(struct pollfd* fds, nfds_t nfds, int timeout);
    
    struct pollfd {
        int    fd;       /* file descriptor */
        short  events;   /* events to look for */
        short  revents;  /* events returned */
    };	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    参数解释
    timeout阻塞等待时间,不过采用整数单位是毫秒。
    struct pollfd* nfds_tpollfd结构体数组以及数据长度
    struct pollfd.fd:关注的文件描述符
    struct pollfd.events:关注的事件类型
    struct pollfd.revents:就绪的事件类型
    事件类型描述
    POLLIN数据(包括普通数据和优先数据)可读
    POLLRDNORM普通数据可读
    POLLRDBAND优先级带数据可读(Linux 不支持)
    POLLPRI高优先级数据可读,比如 TCP 带外数据
    POLLOUT数据(包括普通数据和优先数据)可写
    POLLWRNORM普通数据可写
    POLLWRBAND优先级带数据可写
    POLLRDHUPTCP 连接被对方关闭,或者对方关闭了写操作,它由GNU引入
    POLLERR错误
    POLLHUP挂起。比如管道的写端被关闭后,读端描述符将收到 POLLHUP 事件
    POLLNVAL文件描述符没有打开

    poll的使用

    const int   default_port    = 8080;
    const int   default_size    = 20;
    const int   default_timeout = -1;
    const int   default_fd      = -1;
    const short default_event   = 0;
    
    class poll_server : public inet::tcp::server {
    public:
        poll_server(uint16_t port) : server(port), _fds(new struct pollfd[default_size])
            , _cap(0), _timeout(default_timeout) {
            pollfd_arr_init();
        }
        void pollfd_arr_init() {
            for (int i = 0; i < default_size; i++) pollfd_init(_fds[i]);
        }
        void pollfd_init(struct pollfd& pf) {
            pf.fd = default_fd;
            pf.events = default_event;
            pf.revents = default_event;
        }
        void pollfd_clear(struct pollfd& pf) {
            pf.fd = default_fd;
            pf.events = default_event;
            pf.revents = default_event;
        }
        void start() {
            _fds[0].fd = _sock;
            _fds[0].events = POLLIN;
            ++_cap;
            while (true) {
                int timeout = _timeout;
                switch (poll(_fds.get(), _cap, timeout)) {
                case 0: INFO("time out: %d", timeout); break;
                case -1: ERROR("select error, %d %s", errno, strerror(errno)); break;
                default: event_handler(); break;
                }
            }
        }
    private:
        void event_handler() {
            for (int i = 0; i < _cap; i++) {
                auto& fd = _fds[i].fd;
                auto& revents = _fds[i].revents;
                if (revents & POLLIN) {
                    if (fd == _sock) {
                        acceptor();
                    } else {
                        std::string buf;
                        recver(i, &buf);
                    }
                }
                if (revents & POLLOUT) {
                    std::string msg = "test";
                    sender(i, msg);
                }
                if (revents & POLLERR){
                    WARN("excepton event occurred, fd: %d", fd);
                }
            }
        }
        void acceptor() {
            std::string cip;
            uint16_t cport;
            int newfd = accept(&cip, &cport);
            if (_cap >= default_size) {
                close(newfd);
                WARN("connect close, fd array is full");
                return;
            }
            for (int i = 0; i < default_size; i++) {
                if (_fds[i].fd == default_fd) {
                    _fds[i].fd = newfd;
                    _fds[i].events = POLLIN | POLLOUT;
                    _cap++;
                    break;
                }
            }
            INFO("a connect %d has been accepted [%s:%d]", newfd, cip.c_str(), cport);
        }
        void recver(int i, std::string* buf) {
            ssize_t s = recv(_fds[i].fd, buf, 1024);
            if (s > 0) {
                std::cout << *buf << std::endl;
            } else {
                if (s == 0) INFO("client quit");
                else WARN("recv error, %d %s", errno, strerror(errno));
                close(_fds[i].fd);
                pollfd_clear(_fds[i]);
                --_cap;
            }
        }
        void sender(int i, const std::string& msg) {
            size_t s = send(_fds[i].fd, msg);
            if (s <= 0) {
                if (s == 0) INFO("client quit");
                else WARN("send error, %d %s", errno, strerror(errno));
                close(_fds[i].fd);
                pollfd_clear(_fds[i]);
                --_cap;
            }
        }
    private:
        std::unique_ptr<struct pollfd[]> _fds;
        int _cap;
        int _timeout;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    poll的优缺点

    优点
    监视fd的个数无上限
    将事件输入输出分离,避免原始数据被修改
    缺点
    返回后仍需要遍历数组检测就绪事件
    poll内部仍需要内核自己遍历检测就绪事件
    每次调用都要将pollfd结构从内核空间拷贝到用户空间

     

    2.3 epoll

    epoll的接口

    #include 
    int epoll_create(int size);
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    
    typedef union epoll_data {
        void*    ptr;
        int      fd;
        uint32_t u32;
        uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event {
        uint32_t     events;    /* Epoll events */
        epoll_data_t data;      /* User data variable */
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    epoll_create负责创建epoll模型
    size目前size被忽略,为兼容可写128/256
    返回值epoll句柄
    epoll_ctl负责用户告诉内核那些事件需要关注
    epfdepoll句柄
    op指定相关操作
    EPOLL_CTL_ADD:添加事件
    EPOLL_CTL_MOD:修改事件
    EPOLL_CTL_DEL:删除事件
    fd事件关注的文件描述符
    epoll_event用来指定fd上关注的事件
    epoll_wait负责内核告诉用户那些事件就绪
    epfdepoll句柄
    epoll_event输出缓冲区,存放已就绪的事件
    maxevents缓冲区的长度
    timeout阻塞等待的时间
    返回值就绪事件的个数
    events宏常量取值解释
    EPOLLIN表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
    EPOLLOUT表示对应的文件描述符可以写
    EPOLLPRI表示对应的文件描述符有紧急的数据可读(带外数据)
    EPOLLERR表示对应的文件描述符发生错误
    EPOLLHUP表示对应的文件描述符被挂断
    EPOLLET将EPOLL设为边缘触发(Edge Triggered)模式
    EPOLLONESHOT只监听一次事件,本次之后自动将该fd删去

    epoll的使用

    epoll_server 封装最终版

    epoll的原理

    1. epoll模型中用红黑树保存注册的fd和事件,用就绪队列保存就绪的fd和事件。
    2. epoll_ctl的本质就是新增修改删除红黑树的节点,并对fd对应的文件中注册回调函数。
    3. 如果事件就绪,内核在将硬件数据拷贝至内核缓冲区后,还会自动执行回调将红黑树节点添加到就绪队列中。
    4. epoll_wait负责检查是否有事件就绪,本质就是检测就绪队列为空。

    在这里插入图片描述

    epoll的工作模式

    epoll有两种工作方式,分别是水平触发LT和边缘触发ET。

    LTET的概念
    • LT水平触发:只要事件一直就绪,就会一直通知。
    • ET边缘触发:只有事件就绪或再次就绪时,才会通知一次。
    LT水平触发

    事件就绪时,可以不立刻处理或只部分处理。

    只要事件处于就绪状态,每次调用epoll_wait都会通知该事件就绪,直到处理完毕处于未就绪状态。

    ET边缘触发

    设置事件为EPOLLET,表示对于该事件使用ET模式。

    事件就绪时必须一次性处理清空数据,否则下次是不会通知该事件就绪的,直到该事件再次就绪。

    LTET的读写特点

    数据剩余ET不会提醒,所以必须一次性读取所有数据,但如果读取时刚好无数据就会被阻塞。 所以ET必须采用非阻塞读写。

    LT模式事件就绪时读取一定不会被阻塞,因为一定有数据。

    LTET的效率对比

    一般ET的效率>=LT的效率。原因如下:

    1. 一般ET通知次数比LT少,也就是系统调用次数少。
    2. ET会倒逼程序员一次读取全部数据,所以底层TCP会更新出更大的滑动窗口。
    LTET的应用场景
    • ET要求程序必须一次性读取所有数据,再让上层处理,ET重IO效率。
    • LT可以只交付部分数据,尽快让上层处理,LT重处理效率。

    ET高IO,LT高响应。

    epoll的优缺点

    优点解释
    接口分离解耦每次调用不需要重新设置事件集,做到输入输出事件分离
    使用简单高效调用后用户不需要遍历,内核提供就绪事件缓冲区
    轻量数据拷贝不需要频繁的进行将数据从内核和用户之间的拷贝
    无遍历效率高底层不需要遍历,利用回调将就绪事件添加到就绪队列中
    没有数量限制文件描述符数目无上限

    epoll的写入设置

    • 只有读取缓冲区有数据,读事件才会就绪。所以读事件可以一直关注,我们称为常设置。
    • 只要写入缓冲区没有满,写事件就一直就绪。所以写事件按需设置,写入完成后立即关闭,否则会一直触发。

    一般构建响应后,直接发送数据,只有当缓冲区满的时候,再将没写完的数据交给epoll处理。

    select、poll、epoll都是如此,但epoll的ET模式可以常设置写事件。

  • 相关阅读:
    webpack5 基本配置
    商场会员营销系统 购物中心会员精细化运营体系
    【2023最新版】DataGrip使用MySQL教程
    java: 无效的源发行版: 11解决方法
    设计模式学习笔记 - 规范与重构 - 7.实践:通过一段ID生成器代码,学习如何发现代码质量问题
    万字手撕七大排序(代码+动图演示)
    026-第三代软件开发-C++&QML交互
    数据中心网络架构的问题与演进 — NFV
    Vue知识点总结
    C++ 函数模板
  • 原文地址:https://blog.csdn.net/yourfriendyo/article/details/126430918