• 高级IO---五种IO模型&多路转接之Select


    五种IO模型

    1、阻塞IO

    在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。

    也就是说,在数据准备好之前,系统调用只会静静的等待着数据的到来并不会去干其他的事情

    就好比去钓鱼,将鱼饵丢进水里后,啥也不干就静静的看着鱼饵随时准备鱼上钩

    2、非阻塞IO

    如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码

    非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询。这对CPU来说是较大的浪费,一般只有特定场景下才使用。

    就好比去钓鱼,将鱼饵丢进水里后,不会一直去盯着鱼饵,一边干着其他事看看书啥的偶尔看看鱼饵

    3、信号驱动IO

    内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作。

    这就好比在鱼竿上系上一个铃铛,然后去干别的事,当鱼上钩时拉动鱼线就会使铃铛摇晃发出声音提醒鱼上钩了

    4、多路转接IO

    最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态。

    这就好比拿着多条鱼竿去钓鱼,全部丢进水里然后巡视所有鱼竿,一有鱼上钩就拉动对应的鱼竿

    5、异步IO

    由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。

    这就好比老板想吃鱼,不用自己去钓,让手下去钓鱼调到了之后交给他

    总结IO

    任何IO过程中都包含两个步骤:第一是等待,第二是拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。所以让IO更高效最核心的办法就是让等待的时间尽量减少

    所以综上而言,多把钓竿同时等待,鱼上钩的概率就越大上钩的时间也就越快,所以多路转接IO效率高

    同步与异步

    同步和异步关注的是消息通信机制

    1. 同步:就是在发出一个调用时,在没有得到结果之前,该调用不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果;
    2. 异步:正好相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

    注意这里的同步通信和进程之间的同步是完全不想干的概念

    阻塞与非阻塞

    1. 阻塞:调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回
    2. 非阻塞:在不能立刻得到结果之前,该调用不会阻塞当前线程。

    设置非阻塞

    需要利用系统调用 ---- fcntl

    #include 
    #include 
    
    int fcntl(int fd, int cmd, ... /* arg */ );
    
    • 1
    • 2
    • 3
    • 4

    参数一为:需要设置的文件描述符

    参数二为:想要让fcntl实现的功能

    1. 复制一个现有的描述符(cmd = F_DUPFD) .
    2. 获得/设置文件描述符标记(cmd = F_GETFD或F_SETFD).
    3. 获得/设置文件状态标记(cmd = F_GETFL或F_SETFL).
    4. 获得/设置异步I/O所有权(cmd = F_GETOWN或F_SETOWN).
    5. 获得/设置记录锁(cmd = F_GETLK,F_SETLK或F_SETLKW)

    其中设置非阻塞为第三个功能。

    后面的为追加参数

    利用fcntl接口实现一个设置非阻塞的函数

    void SetNoBlock(int fd) {
    	// 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)
        int fl = fcntl(fd, F_GETFL);
        
        if (fl < 0) {
            perror("fcntl");
            return;
        }
        // 再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    多路转接之Select

    系统提供select函数来实现多路复用输入/输出模型

    select系统调用是用来让程序监视多个文件描述符的状态变化的;
    程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

    select函数原型

     #include 
    
    /* According to earlier standards */
    #include 
    #include 
    #include 
    
    int select(int nfds, fd_set *readfds, fd_set *writefds,
              fd_set *exceptfds, struct timeval *timeout);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. 参数nfds是需要监视的最大的文件描述符值+1
    2. rdset、wrset、exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集合及异常文件描述符的集合
    3. 参数timeout为结构timeval,用来设置select()的等待时间,如果在指定的时间段里没有事件发生, select将超时返回

    fd_set结构

    typedef struct
      {
        /* XPG4.2 requires this member name.  Otherwise avoid the name
           from the global namespace.  */
    #ifdef __USE_XOPEN
        __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
    # define __FDS_BITS(set) ((set)->fds_bits)
    #else
        __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
    # define __FDS_BITS(set) ((set)->__fds_bits)
    #endif
      } fd_set;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    其实这个结构就是一个整数数组, 更严格的说,是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符.

    提供了一组操作fd_set的接口, 来比较方便的操作位图

    void FD_CLR(int fd, fd_set *set); 	// 用来清除描述词组set中相关fd 的位
    int FD_ISSET(int fd, fd_set *set); 	// 用来测试描述词组set中相关fd 的位是否为真
    void FD_SET(int fd, fd_set *set); 	// 用来设置描述词组set中相关fd的位
    void FD_ZERO(fd_set *set); 			// 用来清除描述词组set的全部位
    
    • 1
    • 2
    • 3
    • 4

    例如取fd_set为1个字节,为1字节, fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd

    返回值

    1. 执行成功则返回文件描述词状态已改变的个数
    2. 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
    3. 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds, writefds, exceptfds和timeout的值变成不可预测。

    socket就绪条件

    读就绪

    1. socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
    2. socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
    3. 监听的socket上有新的连接请求;
    4. socket上有未处理的错误

    写就绪

    1. socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
    2. socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
    3. socket使用非阻塞connect连接成功或失败之后;
    4. socket上有未读取的错误;

    select的特点

    1. 可监控的文件描述符个数取决与sizeof(fd_set)的值。每bit表示一个文件描述符
    2. 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd,
      1. 一是用于再select 返回后, array作为源数据和fd_set进行FD_ISSET判断。
      2. 二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数

    select的缺点:

    1. 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
    2. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    3. 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
    4. select支持的文件描述符数量太小

    select使用示例

    这里以只关心读事件为例,写事件同理

    Util.hpp(工具类,将用到的函数放在该类中)

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include "log.hpp"
    #include 
    #include 
    
    using namespace std;
    
    #define INITPORT 8000
    #define FDNUM 1024
    #define DEFAULTFD -1
    
    // 打印函数调试
    void Print(const vector<int> &fdv)
    {
        cout << "fd list: ";
        for (int i = 0; i < FDNUM; i++)
        {
            if (fdv[i] != DEFAULTFD)
                cout << fdv[i] << " ";
        }
        cout << endl;
    }
    
    class Util
    {
    public:
        static void Recv(vector<int> &fdv, int sock, int i)
        {
            // 读取
            // 读取失败就关闭sock并且修改集合组里的数据
            char buff[1024];
            ssize_t s = recv(sock, buff, sizeof(buff) - 1, 0);
            if (s > 0)
            {
                buff[s] = 0;
                cout << "client: " << buff << endl;
                LogMessage(NORMAL, "client: %s", buff);
            }
            else if (s == 0)
            {
                close(sock);
                fdv[i] = DEFAULTFD;
                LogMessage(NORMAL, "client quit");
                return;
            }
            else
            {
                close(sock);
                fdv[i] = DEFAULTFD;
                LogMessage(ERROR, "client quit: %s", strerror(errno));
                return;
            }
    
            // 写回数据
            // 这里不考虑写事件
            string response = buff;
    
            write(sock, response.c_str(), response.size());
            LogMessage(DEBUG, "Recver end");
        }
    
        // 将通信sock添加进集合组
        static void AddSock(vector<int> &fdv, int listensock)
        {
            // listensock读事件就绪
            string clientip;
            uint16_t clientport;
            int sock = Util::GetSock(listensock, &clientip, &clientport);
            if (sock < 0)
                return;
            else
            {
                LogMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
                // 遍历数组,要考虑满的情况
                // 遇到为-1的位置插入新的sock
                int i = 0;
                for (; i < FDNUM; ++i)
                    if (fdv[i] == DEFAULTFD)
                        break;
                if (i == FDNUM)
                {
                    LogMessage(WARNING, "server if full, please wait");
                    close(sock);
                }
                else
                    fdv[i] = sock;
            }
    
            Print(fdv);
            LogMessage(DEBUG, "Accepter out");
        }
    
        // 获取新连接创建通信sock
        static int GetSock(int listensock, string *clientip, uint16_t *clientport)
        {
            struct sockaddr_in peer;
            memset(&peer, 0, sizeof(peer));
            socklen_t len = sizeof(peer);
            int sock = accept(listensock, (struct sockaddr *)&peer, &len);
            if (sock < 0)
                LogMessage(ERROR, "accept socket error, next");
            else
            {
                LogMessage(NORMAL, "accept socket %d success", sock);
                cout << "sock: " << sock << endl;
                *clientip = inet_ntoa(peer.sin_addr);
                *clientport = ntohs(peer.sin_port);
            }
    
            return sock;
        }
    
        // 设置监听套接字为监听状态
        static void setListen(int listensock)
        {
            if (listen(listensock, 5) < 0)
            {
                LogMessage(FATAL, "listen socket error!");
                exit(3);
            }
            LogMessage(NORMAL, "listen socket success");
        }
    
        // 绑定网络信息
        static void bindSock(int port, int listensock)
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            local.sin_addr.s_addr = INADDR_ANY;
            if (bind(listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                LogMessage(FATAL, "bind socket error!");
                exit(2);
            }
            LogMessage(NORMAL, "bind sock success");
        }
    
        // 创建监听套接字
        static void createSock(int *listensock)
        {
            *listensock = socket(AF_INET, SOCK_STREAM, 0);
            if (listensock < 0)
            {
                LogMessage(FATAL, "create socket error!");
                exit(1);
            }
            LogMessage(NORMAL, "create socket success");
    
            // 设置进程可以立即重启
            int opt = 1;
            setsockopt(*listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        }
    
        // 设置非阻塞
        static void SetNonBlock(int fd)
        {
            int f = fcntl(fd, F_GETFL);
            if (f < 0)
            {
                cerr << "fcntl" << endl;
                return;
            }
            fcntl(fd, F_SETFL, f | O_NONBLOCK);
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177

    Server.hpp(实现服务器)

    #pragma once
    
    #include 
    #include "Util.hpp"
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    class Server
    {
    public:
        Server(const uint16_t port = INITPORT)
            : _port(port), _listensock(-1)
        {
        }
    
        void HandlerEvent(fd_set &rfds)
        {
            int i = 0;
            for (auto e : fdv)
            {
                // 过滤掉非法的fd
                if (e == DEFAULTFD)
                    continue;
                if (FD_ISSET(e, &rfds) && e == _listensock) // 判断listensock在不在就绪的集合中
                    Util::AddSock(fdv, _listensock);
                else if(FD_ISSET(e, &rfds)) // 如果为其他的文件描述符则读取数据
                    Util::Recv(fdv, e, i);
                else
                {}
    
                ++i;
            }
        }
    
        void Init()
        {
            // 创建监听套接字
            Util::createSock(&_listensock);
    
            // 绑定网络信息
            Util::bindSock(_port, _listensock);
    
            // 设置监听套接字为监听状态
            Util::setListen(_listensock);
    
            fdv.resize(FDNUM, DEFAULTFD);
            fdv[0] = _listensock;
        }
    
        void start()
        {
            while (1)
            {
                fd_set rfds;
                // 清除描述词组set的全部位
                FD_ZERO(&rfds);
                // 记录下最大的文件描述符
                int max = fdv[0];
                // 遍历数组,将合法的fd插入到事件集中
                // 并记录最大的fd为调用select接口做准备
                for (auto e : fdv)
                {
                    if (e == DEFAULTFD)
                        continue;
                    // 设置描述词组set中相关fd的位
                    FD_SET(e, &rfds);
                    if (e > max)
                        max = e;
                }
                // 设置等待时间结构
                struct timeval timeout = {1, 0};
                int n = select(max + 1, &rfds, nullptr, nullptr, &timeout);
                switch (n)
                {
                case 0:
                    cout << "timeout...." << endl;
                    LogMessage(NORMAL, "timeout....");
                    break;
                case -1:
                    printf("select error, code: %d, err string: %s", errno, strerror(errno));
                    LogMessage(WARNING, "select error, code: %d, err string: %s", errno, strerror(errno));
                    break;
                default:
                    // 有事件就绪
                    cout << "event readly" << endl;
                    LogMessage(NORMAL, "event readly");
                    // 处理事件
                    HandlerEvent(rfds);
                    break;
                }
            }
        }
    
        ~Server()
        {
            if (_listensock < 0)
                close(_listensock);
        }
    
    private:
        int _listensock;
        uint16_t _port;
        vector<int> fdv;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    log.hpp(日志类)

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    #define DEBUG 0
    #define NORMAL 1
    #define WARNING 2
    #define ERROR 3
    #define FATAL 4
    
    const char *to_levelstr(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
        case NORMAL:
            return "NORMAL";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return nullptr;
        }
    }
    
    void LogMessage(int level, const char *format, ...)
    {
    #define NUM 1024
        char logpre[NUM];
        snprintf(logpre, sizeof(logpre), "[%s][%ld][%d]", to_levelstr(level), (long int)time(nullptr), getpid());
    
        char line[NUM];
        // 可变参数
        va_list arg;
        va_start(arg, format);
    
        vsnprintf(line, sizeof(line), format, arg);
    
        // 保存至文件
        FILE* log = fopen("log.txt", "a");
        FILE* err = fopen("log.error", "a");
    
        if(log && err)
        {
            FILE *curr = nullptr;
            if(level == DEBUG || level == NORMAL || level == WARNING) 
                curr = log;
            if(level == ERROR || level == FATAL) 
                curr = err;
            if(curr) fprintf(curr, "%s%s\n", logpre, line);
    
            fclose(log);
            fclose(err);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    Server.cc

    #include "Server.hpp"
    #include 
    
    // 输出命令错误函数
    void Usage(string proc)
    {
        cout << "Usage:\n\t" << proc << " local_port\n\n";
    }
    
    int main(int argc, char *argv[])
    {
        // 启动服务端不需要指定IP
        if (argc != 2)
        {
            Usage(argv[0]);
            exit(1);
        }
    
        uint16_t port = atoi(argv[1]);
        unique_ptr<Server> sptr(new Server(port));
    
        sptr->Init();
        sptr->start();
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    效果演示

    image-20230907205551697

    首先由客户端连接,listen套接字就绪,建立连接

    然后客户端发送数据,负责通信的套接字就绪,读取数据后再发回去

  • 相关阅读:
    货架穿梭车控制方案
    小白跟做江科大32单片机之按键控制LED
    海康威视嵌入式软件一面(技术面)
    莞中集训8.1
    C/C++编译器配置——MinGW下载安装
    manim边学边做--Matrix
    肖sir__设计测试用例方法之经验测试方法09_(黑盒测试)
    JavaScript防抖和节流(从认识到理解到手写)
    机器学习教程
    services.Jenkins Additional property tags is not allowed
  • 原文地址:https://blog.csdn.net/CHJBL/article/details/132746096