目录
4.5.1. LT模式(水平触发Level Triggered )
4.5.2. ET模式(边缘触发Edge Triggered)
阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式。

非阻塞IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码.

非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用.
信号驱动IO: 内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作。

IO多路转接: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态.

异步IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).

任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少.
前四种IO都属于同步io,同步io与异步io的区别在于:同步io在等待和拷贝的两个过程中应用进程至少做了一步,而异步io都由OS完成。
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态:
阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
非阻塞io的概念我们,都知道,那么该如何实现呢?
使用系统调用函数 fcntl:
- #include
- #include
c - int fcntl(int fd, int cmd, ... /* arg */ );
-
- // fcntl的五种功能
- //复制一个现有的描述符(cmd=F_DUPFD) .
- //获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
- //获得/设置文件状态标记(cmd=F_GETFL或F_SETFL),可以将一个文件描述符设置为非阻塞。
- //获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
- //获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW).
这里我们使用fcntl将stdin设置为非阻塞
使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图).
然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数.
- #include
- #include
- #include
- #include
- #include
-
- void SetNoBlock(int fd)
- {
- int fl = fcntl(fd, F_GETFL); // 获取fd属性
- if (fl < 0)
- {
- perror("fcntl");
- return;
- }
- fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置fd为非阻塞
- }
- int main()
- {
- SetNoBlock(0);
- while (1)
- {
- char buf[1024] = {0};
- ssize_t s = read(0, buf, sizeof(buf) - 1);
- if (s < 0)
- {
- if(errno== EAGAIN|| errno == EWOULDBLOCK)
- {
- printf("数据未准备好!\n");
- printf("read error errno: %d\n" ,errno);
- sleep(1);
- continue;
- }
- }
- else
- {
- buf[s] = 0;
- write(1, buf, strlen(buf));
- printf("read success, s: %d\n", s);
- }
- }
- return 0;
- }
测试结果:

在非阻塞的情况下,读取数据,如果数据没有就绪,系统是以出错的形式返回的(不是错误),会将errno设置为11,表示try agin

系统提供select函数来实现多路复用输入/输出模型.
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
select定位:只负责等待,得到fd就绪,就通知上层进行读取或写入,没有读取或写入的功能。
read、write、recv、send本身也有等待功能,但只能等待一个fd;但select可以同时等待多个fd。
- #include
- int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
-
- // nfds: select等待的fd中最大的fd+1 (nfs = maxfd+1)
- // fd_set: 位图结构,每个比特位表示一个fd(输入时:用户告诉内核自己关心的fd;输出时:内核告诉用户,哪些fd上的事件已经就绪)
- // readfds:读事件fd(用户层:用户告知内核需要等待的读事件fd,内核层:告诉用户哪些fd的读事件已经就绪)(输入输出型参数)(下2个同理)
- // writefds:写事件fd
- // exceptfds:异常事件fd
- // struct timeval:该结构体中包含秒和毫秒两个成员
- // timeout:设置一个deadline,在deadline之内如果事件不就绪,就不返回;在deadline之外,立即返回
- // timeout: NULL,阻塞等待; {0,0},非阻塞;{5,0},5s超时
- // 返回值:大于0,表示有几个fd就绪;等于0,表示超时;小于0,表示出错

readfds、writefds、exceptfds三个参数虽然是位图结构,但是使用时不能自己使用位操作,只能使用OS提供的接口:
- void FD_CLR(int fd, fd_set *set);
- int FD_ISSET(int fd, fd_set *set); // 判断fd是否就绪
- void FD_SET(int fd, fd_set *set); // 将fd设置进位图
- void FD_ZERO(fd_set *set); // 将位图清0
select执行过程:
执行fd_set set; FD_ZERO(&set);则set用位表示是0000,0000
若fd= 5,执行FD_SET(fd,&set);后set变为0010,0000(第5位置为1)
若再加入fd= 2, fd=1,则set变为0010,0110 (用户告诉内核)
执行select(6,&set,0,0,0)阻塞等待
若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0110 (内核告诉用户)
注意:没有事件发生的fd=5被清空
话不多说,细节都在代码中:
- #include
- #include
- #include"sock.hpp" // 封装的套接字接口
-
- #define NUM (sizeof(fd_set)*8)
-
- int fd_array[NUM]; // 内容>=0,合法;如果是-1,表示该位置没有fd
-
- // ./select_server 8080
-
- static void Usage(string proc)
- {
- cout<<"Usage: "<
"port"< - }
-
- int main(int argc, char* argv[])
- {
- if(argc!=2)
- {
- Usage(argv[0]);
- exit(1);
- }
-
- uint16_t port = (uint16_t)atoi(argv[1]);
- int listen_sock = Sock::Socket();
- Sock::Bind(listen_sock, port);
- Sock::Listen(listen_sock);
-
- for (int i = 0; i < NUM; ++i)
- {
- fd_array[i] = -1;
- }
-
- // 这里不能立即accept,其本质是通过listen_sock获取新连接,前提是listen_sock上面有新连接,
- // 但是accept不知道是否有新连接,所以会阻塞等待
- // 在多路转接视角,连接到来时对于listen_sock来说,就是读事件就绪!
- // 对于所有服务器,最开始的时候,只有listen_sock
-
- fd_set rfds;
- fd_array[0] = listen_sock;
- while (true)
- {
-
- FD_ZERO(&rfds);
- int max_fd = fd_array[0];
- for (int i = 1; i < NUM; i++)
- {
- if(fd_array[i] == -1)
- continue;
- // 合法fd
- FD_SET(fd_array[i], &rfds); // 将所有要关心的事件的fd,添加到rfds中
- if(max_fd < fd_array[i])
- {
- max_fd = fd_array[i]; // 更新最大fd
- }
- }
-
- FD_SET(listen_sock, &rfds);
- struct timeval timeout = {5, 0};
- int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
-
- switch (n)
- {
- case -1:
- cerr << "select error" << endl;
- break;
- case 0:
- cout << "select timeout" << endl;
- break;
- default:
- cout << "有fd对应的事件就绪了!" << endl;
- // 判断是那个fd就绪了
- for (int i = 0; i < NUM; ++i)
- {
- if(fd_array[i] == -1)
- continue;
- //下面的fd都是合法的,但不一定是就绪的
- if(FD_ISSET(fd_array[i], &rfds))
- {
- cout << "sock: " << fd_array[i] << " 有读事件可以读取了!" << endl;
- // 一定是读事件就绪了,该fd在fd_array[i]中
- // read、recv时,一定不会被阻塞
- if(fd_array[i] == listen_sock)// 判断是否是listen套接字就绪
- {
- cout << "listen_sock: " << listen_sock << " 有链接事件到来!" << endl;
- // accept
- int sock = Sock::Accept(listen_sock);
- if(sock >= 0)
- {
- cout << "sock: " << sock << " 获取新连接成功!" << endl;
- //获取成功
- // 不能立即读取连接,连接到来不意味着数据到来,需要交给select去等待
- // 无法直接将fd设置进select,但好在有fd_array
- int pos = 1;
- for (; pos < NUM; ++pos)
- {
- if(fd_array[pos] == -1)
- break;
- }
- // 1.找到了一个位置没有被使用
- // 2.所有位置都被占用
- if(pos < NUM)
- {
- cout << "新连接:" << sock << " 已经被添加到了数组[" << pos << "]的位置!" << endl;
- fd_array[pos] = sock;
- }
- else
- {
- // 服务器已经满载,无法处理新的请求
- close(sock);
- }
- }
- }
- else //普通套接字就绪
- {
- cout << "sock: " << fd_array[i] << "上面有普通读取" << endl;
- // 本次读取一定会读完吗?会产生数据包粘包问题吗?
- // 现在不解决这个问题
- char recv_buffer[1024] = {0};
- ssize_t s = recv(fd_array[i], recv_buffer, sizeof(recv_buffer)-1, 0);
- if(s > 0)
- {
- recv_buffer[s] = 0;
- cout << "client[" << fd_array[i] << "]# " << recv_buffer << endl;
- }
- else if(s == 0)
- {
- cout << "sock: " << fd_array[i] << "关闭, client quit..." << endl;
- // 对端关闭了连接
- close(fd_array[i]);
- cout << "fd_array中" << i << "位置去掉了sock:" << fd_array[i] << endl;
- fd_array[i] = -1;
-
- }
- else
- {
- // 读取失败
- close(fd_array[i]);
- cout << "fd_array中" << i << "位置去掉了sock:" << fd_array[i] << endl;
- fd_array[i] = -1;
- cerr << "recv error" << endl;
- }
- }
- }
- }
- break;
- }
- }
-
- return 0;
- }
2.3. select的优缺点
优点:
可以一次等待多个fd,使等待时间重叠,提高IO效率
缺点:
-
每次调用select, 都需要手动设置fd集合,需要遍历检测, 从接口使用角度来说也非常不便。
-
fd_set能够让select同时完成检测的fd是有上限的。
-
select底层(OS)需要遍历检测哪些fd的事件已经就绪,消耗较大。
-
select可能会较为高频率的进行用户到内核,内核到用户的拷贝问题。(设置位图)
2.4. socket就绪条件
读就绪 :
-
socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
-
socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
-
监听的socket上有新的连接请求;
-
socket上有未处理的错误;
写就绪 :
-
socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
-
socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
-
socket使用非阻塞connect连接成功或失败之后;
-
socket上有未读取的错误;
异常就绪 :
-
socket上收到带外数据. 关于带外数据, 和TCP紧急模式相关
3.poll
poll同select类似,也是一种多路转接的方案,只负责等待文件描述符,poll是select的优化。
3.1. poll函数
- #include
- int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- //fds:是一个poll函数监听的polled结构体数组. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
- //nfds:表示fds数组的长度.
- //timeout: 与select一样表示poll函数的超时时间(0表示非阻塞,-1表示阻塞), 单位是毫秒(ms)
-
- // pollfd结构
- struct pollfd {
- int fd; /* 文件描述符 */
- short events; /* 请求事件(需要关心哪些事件) */
- short revents; /* 返回事件(哪些事件已经就绪) */
- };
events和revents的取值: 系统宏定义了事件,通过按位或可添加事件

3.2. poll的使用
使用poll来监控标准输入:
- #include
- #include
- #include
- using namespace std;
-
- int main()
- {
- struct pollfd rfds;
- rfds.fd = 0;
- rfds.events = POLLIN;
- rfds.revents = 0;
-
- while(true)
- {
- int n = poll(&rfds, 1, 1000);
- switch(n)
- {
- case 0:
- cout << "time out ...." << endl;
- break;
- case -1:
- cerr << "poll error..." << endl;
- break;
- default:
- if(rfds.revents & POLLIN)
- {
- cout << rfds.fd << ": 读事件已经就绪" << endl;
- char buffer[128];
- ssize_t s = read(0, buffer, sizeof(buffer));
- if(s>0)
- {
- buffer[s] = 0;
- cout << buffer << endl;
- }
- else
- {
- cout << "read error..." << endl;
- }
-
- }
- break;
- }
- }
- return 0;
- }
tcp部分的代码,将select数组与接口简单的改了以后也可以正常使用,因为篇幅问题,这里就不贴代码了。
poll的就绪条件同select一样。
3.3. poll的优缺点
优点:
-
poll不要求开发者计算最大文件描述符加一的大小。
-
poll在应付大数目的文件描述符的时候速度更快,相比于select。
-
poll没有最大连接数的限制,原因是它是基于链表来存储的。
-
poll调用函数时,只需要对参数进行一次设置就好了。(不会覆盖用户传的fd)
缺点:
-
和select函数一样, poll返回后,需要轮询pollfd来获取就绪的描述符.
-
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
-
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
4.epoll
epoll是为了处理大批量句柄而做了改进的poll,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路转接方案。
4.1. epoll相关系统调用
epoll_create: 创建一个epoll的句柄(fd)
- #include
- int epoll_create(int size);
- // 自从linux2.6.8之后, size参数被忽略了(但是必须大于0)
- // 使用完毕后,需要调用close()关闭
epoll_ctl:将用户关心的fd对应的事件告诉内核
- #include
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- // epfd: epoll_create创建的fd
- // op: 新增或删除或修改fd对应的事件(EPOLL_CTL_ADD :注册新的fd到epfd中;
- // EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
- // EPOLL_CTL_DEL :从epfd中删除一个fd;)
- // fd: 用户关心的fd
- // struct epoll_event:
- struct epoll_envent{
- uint32_t envents; // 用户关心的fd上的事件
- epoll_data_t data; // 由于该结构只返回了就绪的事件,但不知道是哪个fd上就绪的事件
- } // 所以该联合体是用来描述该sh
events可以是以下几个宏的集合:
-
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
-
EPOLLOUT : 表示对应的文件描述符可以写;
-
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
-
EPOLLERR : 表示对应的文件描述符发生错误;
-
EPOLLHUP : 表示对应的文件描述符被挂断;
-
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
-
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里 。
epoll_wait:内核告诉用户哪些fd的对应事件已经就绪
- #include
- int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
- // epfd: epoll_create创建的fd
- // envents: 输出型参数,把发生的事件赋值到events数组中
- // maxenvents: 告之内核这个events数组有多大,这个 maxevents的值不能大于创建epoll_create()时的size
- // timeout: 超时时间(单位毫秒;0为非阻塞;-1为阻塞)
- // 返回值: 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败
4.2. epoll的使用
epoll使用三板斧:
-
调用epoll_create创建一个epoll句柄;
-
调用epoll_ctl, 将要监控的文件描述符进行注册;
-
调用epoll_wait, 等待文件描述符就绪;
代码:
- #include
- #include
- #include
- #include
- #include"sock.hpp"
- using namespace std;
-
- #define NUM 64
- #define SIZE 128
-
- static void Usage(string proc)
- {
- cerr << "Usage: " << proc << " port" << endl;
- }
-
- int main(int argc, char*argv[])
- {
- if(argc != 2)
- {
- Usage(argv[0]);
- exit(1);
- }
-
- // 1.建立tcp监听socket
- uint16_t port = atoi(argv[1]);
- int listen_sock = Sock::Socket();
- Sock::Bind(listen_sock, port);
- Sock::Listen(listen_sock);
-
- // 2.建立epoll模型,获得epfd
- int epfd = epoll_create(SIZE);
-
- // 3.添加listen_sock和所关心的事件添加到内核
- struct epoll_event ev;
- ev.events = EPOLLIN;
- ev.data.fd = listen_sock;
- epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ev);
-
- // 4.事件循环
- volatile bool quit = false;
- struct epoll_event revs[NUM]; //该数组仅仅是尝试从内核中拿到已经就绪的事件
- while (!quit)
- {
- int timeout = 1000;
- int n = epoll_wait(epfd, revs, NUM, -1);
- switch(n)
- {
- case 0:
- cout << "timeout!" << endl;
- break;
- case -1:
- cerr << "epoll error!" << endl;
- break;
- default:
- cout << "有事件已经就绪!" << endl;
-
- // 5. 处理就绪事件
- for (int i = 0; i < n; ++i)
- {
- int sock = revs[i].data.fd; //暂时方案
- cout << "fd: " << sock << "事件已经就绪..." << endl;
- if (revs[i].events & EPOLLIN)
- {
- // 处理读事件
- cout << "fd: " << sock << " 读事件就绪..." << endl;
- if (sock == listen_sock)
- {
- // 5.1.处理连接事件
- int fd = Sock::Accept(listen_sock);
- cout << "fd: " << sock << " 连接事件就绪..." << endl;
- if(fd >= 0)
- {
- cout << "获取连接成功..." << endl;
- // 不能立即读取,不能确定该fd上的事件是否就绪
- struct epoll_event _ev;
- _ev.events = EPOLLIN;
- _ev.data.fd = fd;
- epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &_ev); // 将新的fd托管给epoll
- cout << "已经将fd: " << fd << " 托管给epoll" << endl;
- }
- else
- {
- cout << "连接获取失败..." << endl;
- }
- }
- else
- {
- // 处理正常读取
- cout << "fd: " << sock << " 正常读取事件就绪..." << endl;
- char buffer[1024];
- ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
- if(s > 0)
- {
- buffer[s] = 0;
- cout << "client [" << sock << "] #" << buffer << endl;
- //将关心的事件改成写事件EPOOLOUT
- // struct epoll_event _ev;
- // _ev.events = EPOLLOUT;
- // _ev.data.fd = sock;
- // epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &_ev);
- }
- else if(s == 0)
- {
- // 对端关闭连接
- cout << "client quit " << sock << endl;
- close(sock); // 关闭连接
- epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr); // 删除epoll中的fd
- cout << "sock: " << sock << "delete from success!" << endl;
- }
- else
- {
- //读取失败
- cout << "recv error" << endl;
- close(sock); // 关闭连接
- epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr); // 删除epoll中的fd
- cout << "sock: " << sock << "delete from success!" << endl;
- }
- }
- }
- else
- {
- // 处理写事件
- }
- }
- break;
- }
- }
-
- close(epfd);
- close(listen_sock);
- return 0;
- }
4.3. epoll原理

-
当某一进程调用epoll_create方法时, Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。
-
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。
-
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
-
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表(即就绪队列)中。
-
在epoll中,对于每一个事件,都会建立一个epitem结构体(即图中的rb_node)。
-
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。
-
如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)
4.4. epoll的优点
-
接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开。
-
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。
-
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响。
-
没有数量限制: 文件描述符数目无上限。
4.5. epoll的工作模式
4.5.1. LT模式(水平触发Level Triggered )
epoll默认状态下就是LT工作模式.
-
当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分。
-
如果只处理了部分数据, 缓冲区中还剩数据, 在第二次调用epoll_wait 时,epoll_wait仍然会立刻返回并通知socket读事件就绪。
-
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回。
-
支持阻塞读写和非阻塞读写。
4.5.2. ET模式(边缘触发Edge Triggered)
如果在listen_sock添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。
-
当epoll检测到socket上事件就绪时, 必须立刻处理。
-
如果只处理了部分数据, 缓冲区中还剩数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了。
-
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会。
-
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll。
-
只支持非阻塞的读写。
简单来说一个fd如果被设置为了ET模式,不管这个fd上面的事件发生了多少次,epoll都只会通知你一次。
ET模式的通知策略其实是倒逼程序猿一旦开始读取数据时,就必须要一直读完,否则可能会导致数据丢失。
select、poll采用的都是LT模式,epoll默认是LT模式,也可以设置为ET模式。
4.6. 实现ET模式下的epoll服务器
前面我们已经了解了什么是ET模式,但是现在有一个问题,当我们使用ET模式时,由于底层只会通知一次,如果缓冲区中数据的大小超过了一次读取的大小,那么怎么才能将缓冲区中的数据读取完呢?
循环读取!但是recv、accept、read等接口在没有读取到数据时会阻塞,在循环读取的情况下可能最后一次读取肯定是没有读取到数据的(例如一次读取100个字节,而缓冲区中有200个字节的数据,当读了两次之后不能确定缓冲区中是否还有数据,所以还会再读一次),所以服务器会一直阻塞住,所以在ET模式下,所有的fd必须要设置为非阻塞!
该epoll服务器提供计算服务,目前只实现了加法功能。
epoll_server.cc:
- #include"Reactor.hpp"
- #include"sock.hpp"
- #include"Accepter.hpp"
- #include"Util.hpp"
-
- static void Usage(string proc)
- {
- cout << "Usage: " << proc << " port" << endl;
- }
-
- int main(int argc, char* argv[])
- {
- if(argc != 2)
- {
- Usage(argv[0]);
- exit(1);
- }
-
- // 1. 创建socket监听
- uint16_t port = (uint16_t)atoi(argv[1]);
- int listen_sock = Sock::Socket();
- SetNonBlock(listen_sock); //套接字设为非阻塞
- Sock::Bind(listen_sock, port);
- Sock::Listen(listen_sock);
-
- // 2.创建Reactor对象
- Reactor *R = new Reactor();
- R->InitReactor();
-
- // 3. 给Reactor中反应堆中添加套接字
- Event* evp = new Event();
- evp->sock = listen_sock;
- evp->R = R;
-
- //Accepter:连接管理器
- evp->RegisterCallback(Accepter, nullptr, nullptr);
- R->InsertEvent(evp, EPOLLIN | EPOLLET); // 设置为ET模式
-
- // 4. 开始事件派发
- int timeout = 1000;
- while (true)
- {
- R->Dispatcher(timeout);
- }
- return 0;
- }
Reactor.hpp: 反应堆模式,通过多路转接方案,被动采用事件派发方式,去反向的调用对应的回调函数。
检测事件:epoll
派发事件:Dispatcher
连接事件:accepter
IO事件:recver、sender
- #pragma once
-
- #include
- #include
- #include
- #include
- #include
- #include
- using namespace std;
-
- class Event;
- class Reactor;
-
- typedef int (*callback_t)(Event *ev); // 函数指针类型
- #define SIZE 128
- #define NUM 64
-
- class Event
- {
- public:
- // 对应的文件描述符
- int sock;
- // 对应的sock的输入缓冲区
- string inbuffer;
- // 对应sock的输出缓冲区
- string outbuffer;
-
- //给sock设置回调
- callback_t recver;
- callback_t sender;
- callback_t errorer;
-
- // 设置Event回指Reactor的指针
- Reactor *R;
-
- public:
- Event()
- {
- sock = -1;
- recver = nullptr;
- sender = nullptr;
- errorer = nullptr;
- R = nullptr;
- }
-
- void RegisterCallback(callback_t _recver, callback_t _sender, callback_t _errorer)
- {
- recver = _recver;
- sender = _sender;
- errorer = _errorer;
- }
-
- ~Event(){}
- };
-
- // 不需要关心任何sock的类型
- // 只关心如何使用该类,对Event管理
- class Reactor
- {
- private:
- int epfd;
- unordered_map<int, Event *> events; // Epoll类管理的所有Event的集合
- public:
- Reactor():epfd(-1){}
- void InitReactor()
- {
- epfd = epoll_create(SIZE);
- if(epfd < 0)
- {
- cerr << "epoll_create error! " << endl;
- exit(2);
- }
-
- cout << "InitReactor success! " << endl;
- }
- bool InsertEvent(Event *evp, uint32_t evs)
- {
- // 1. 将sock插入到epoll中
- struct epoll_event ev;
- ev.events = evs;
- ev.data.fd = evp->sock;
- if(epoll_ctl(epfd, EPOLL_CTL_ADD, evp->sock, &ev) < 0)
- {
- cerr << "epoll_ctr add event failed!" << endl;
- return false;
- }
- // 2. 将ev本身插入到unordered_map中
- events.insert({evp->sock, evp});
- cout << "insert listen_sock success!" << endl;
- }
- void DeleteEvent(Event *evp)
- {
-
- int sock = evp->sock;
-
- auto iter = events.find(sock);
- if(iter != events.end())
- {
- // 1. 将sock从epoll中删除
- epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
-
- // 2. 将sock对应的ev从map中删除
- events.erase(iter);
-
- close(sock);
-
- delete evp;
- }
-
- }
-
- // 修改,使该sock能读写
- bool EnableReadWrite(int sock, bool enable_read, bool enable_write)
- {
- struct epoll_event ev;
- ev.events = EPOLLET | (enable_read ? EPOLLIN : 0) | (enable_write ? EPOLLOUT : 0);
- ev.data.fd = sock;
- if(epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev)< 0)
- {
- cerr << "epoll_ctr mod event failed!" << endl;
- return false;
- }
- return true;
- }
-
- bool IsSockOk(int sock)
- {
- auto iter = events.find(sock);
- return iter != events.end();
- }
-
- // 就绪事件的派发器
- void Dispatcher(int timeout)
- {
-
- struct epoll_event revs[NUM];
- int n = epoll_wait(epfd, revs, NUM, timeout);
-
- for (int i = 0; i < n; ++i)
- {
- int sock = revs[i].data.fd;
-
- uint32_t revents = revs[i].events;
- // 差错处理,将所有的错误问题全部转化为让IO函数处理
- if(revents & EPOLLERR)
- revents |= (EPOLLIN | EPOLLOUT);
- if(revents & EPOLLHUP)
- revents |= (EPOLLIN | EPOLLOUT);
-
- // 读数据就绪
- if(revents & EPOLLIN)
- {
-
- if (IsSockOk(sock) && events[sock]->recver) //如果该回调函数被设置
- events[sock]->recver(events[sock]); // 直接调用回调方法,执行对应的读取
-
- }
- if(revents & EPOLLOUT)
- {
- if(IsSockOk(sock) && events[sock]->sender)
- events[sock]->sender(events[sock]);
- }
- }
- }
-
- ~Reactor(){}
- };
Accepter.hpp: 获取连接
- #pragma once
- #include"Reactor.hpp"
- #include"sock.hpp"
- #include"Service.hpp"
- #include"Util.hpp"
- int Accepter (Event *evp)
- {
- cout << "有新的连接到来,sock:" << evp->sock << endl;
- while(true)
- {
- int sock = Sock::Accept(evp->sock);
- if(sock < 0)
- {
- cout << "Accept Done! " << endl;
- break;
- }
-
- cout << "Accept success, sock: " << sock << endl;
- //获取连接成功
- SetNonBlock(sock);
- Event *other_evp = new Event();
- other_evp->sock = sock;
- other_evp->R = evp->R;
- // recver,sender,errorer 只负责读写
- other_evp->RegisterCallback(Recver, Sender, Errorer);
- evp->R->InsertEvent(other_evp, EPOLLIN | EPOLLET);
- }
- }
Service.hpp: 提供读、写服务
- #pragma once
-
- #include
- #include
- #include"Reactor.hpp"
- #include"Util.hpp"
- #define ONCE_SIZE 128
-
-
- // 返回值
- // 1:本轮读取完成
- // 0: 对端关闭连接
- // -1: 读取出错
-
-
- static int RecverCore(int sock, string & inbuffer)
- {
- while(true)
- {
- char buffer[ONCE_SIZE];
- ssize_t s = recv(sock, buffer, ONCE_SIZE -1, 0);
- if(s > 0)
- {
- buffer[s] = 0;
- // 读取成功
- inbuffer += buffer;
- }
- else if(s ==0) //对端关闭连接
- {
- return 0;
- }
- else
- {
- // s<0;
- // 1.读完,底层没数据了;
- if(errno == EINTR)
- {
- // IO被信号中断
- continue;
- }
- if(errno == EAGAIN||errno ==EWOULDBLOCK)
- {
- return 1;
- }
- // 2.出错了
- return -1;
- }
- }
- }
-
- int Recver(Event *evp)
- {
- cout << "Recver been called!" << endl;
- // 1.读取数据
- int result = RecverCore(evp->sock, evp->inbuffer);
- if(result <= 0)
- {
- // 差错处理
- if(evp->errorer)
- {
- evp->errorer(evp);
- }
- return -1;
- }
-
- // 2.分包(解决粘包问题)
- vector
tokens; - string sep = "X";
- SplitSegment(evp->inbuffer, &tokens, sep);
-
-
- // 3.反序列化,针对一个报文,提取有效参与计算或者存储的信息
- for(auto seg:tokens)
- {
- string data1, data2;
- if(Deserialize(seg, &data1, &data2))// 反序列化
- {
- // 4.业务逻辑,得到结果
- int x = atoi(data1.c_str());
- int y = atoi(data2.c_str());
- int z = x + y;
-
- // 5.构建响应, 再添加到evp->outbuffer
- string res = data1;
- res += "+";
- res += data2;
- res += "=";
- res = to_string(z);
- res += sep;
-
- evp->outbuffer += res; //发送数据
- }
- }
-
- // 6.尝试直接/间接进行发送
- // 写事件一般都是就绪的,但用户不一定就绪
- // 对于写事件,通常是按需设置
- if(!evp->outbuffer.empty())
- {
- // 写事件打开的时候,默认就是就绪的,即便是发送缓冲区满了
- // epoll只要用户重新设置了out事件,EPOLLOUT至少会触发一次
- evp->R->EnableReadWrite(evp->sock, true, true);
- }
- }
-
-
- // 返回值
- // 1: 数据全部发送完成
- // 0: 数据没发完,不能再发
- // -1: 发送出错
- int SenderCore(int sock, string &outbuffer)
- {
-
- while(true)
- {
- int total = 0; // 本轮累计发送的数据量
- const char *start = outbuffer.c_str();
- int size = outbuffer.size(); // 缓冲区数据总大小
- ssize_t curr = send(sock, start + total, size - total, 0);
- if(curr > 0)
- {
- total += curr;
- if(total == size)
- {
- // 将数据全部发送完成
- outbuffer.clear();
- return 1;
- }
- }
- else
- {
- // 数据没发完,但是不能再发了
- if(errno == EINTR) continue;
- if(errno == EAGAIN || errno == EWOULDBLOCK)
- {
- outbuffer.erase(0, total);
- return 0;
- }
- return -1;
- }
- }
- }
- int Sender(Event *evp)
- {
- cout << "Sender been called!" << endl;
- int res = SenderCore(evp->sock, evp->outbuffer);
- if(res == 1)
- {
- evp->R->EnableReadWrite(evp->sock, true, false);
- }
- else if(res == 0)
- {
- evp->R->EnableReadWrite(evp->sock, true, true);
- }
- else
- {
- if(evp->errorer)
- evp->errorer(evp);
- }
- }
-
- int Errorer(Event *evp)
- {
- cout << "errorer been called!" << endl;
- evp->R->DeleteEvent(evp);
- }
Sock.hpp: TCP相关接口的实现
- #pragma once
-
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- using namespace std;
- class Sock
- {
- public:
- static int Socket()
- {
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if(sock < 0)
- {
- cerr << "socket error" << endl;
- exit(2);
- }
- return sock;
- }
- static void Bind(int sock, uint16_t port)
- {
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(port);
- local.sin_addr.s_addr = INADDR_ANY;
- if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
- {
- cerr << "bind error" << endl;
- exit(3);
- }
- }
- static void Listen(int sock)
- {
- if(listen(sock, 5) < 0)
- {
- cerr << "listen error" << endl;
- exit(4);
- }
- }
- static int Accept(int sock)
- {
- struct sockaddr_in peer;
- socklen_t len = sizeof(peer);
- int fd = accept(sock, (struct sockaddr *)&peer, &len);
- if(fd >= 0)
- {
- return fd;
- }
- return -1;
- }
- static void Connect(int sock, string ip , uint16_t port)
- {
- struct sockaddr_in server;
- memset(&server, 0, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_port = htons(port);
- server.sin_addr.s_addr = inet_addr(ip.c_str());
- if(connect(sock, (struct sockaddr*)&server, sizeof(server)==0))
- {
- cout << "connect success!" << endl;
- }
- else
- {
- cout << "connect failed" << endl;
- exit(5);
- }
- }
- };
Util.hpp: 工具文件,设置文件描述符为非阻塞、分包、反序列化
- #pragma once
-
-
-
- #include
- #include
- #include
- #include
- #include
- #include"Service.hpp"
- using namespace std;
-
- // 工具类
- //设置文件描述符为非阻塞
- void SetNonBlock(int sock)
- {
- int fl = fcntl(sock, F_GETFL);
- if(fl < 0)
- {
- std::cerr << "fcntl failed!" << std::endl;
- return;
- }
- fcntl(sock, F_SETFL, fl | O_NONBLOCK);
- }
-
-
- void SplitSegment(string& inbuffer, vector
*tokens, string sep) - {
- while(true)
- {
- cout << "inbuffer: " << inbuffer << endl;
- auto pos = inbuffer.find(sep);
- if(pos == string::npos)
- {
- break;
- }
- string sub = inbuffer.substr(0, pos);
- tokens->push_back(sub);
- inbuffer.erase(0, pos + sep.size());
- }
- }
-
- bool Deserialize(const string& seg, string *out1, string *out2)
- {
- string op = "+";
- auto pos = seg.find(op);
- if(pos == std::string::npos)
- {
- return false;
- }
-
- *out1 = seg.substr(0, pos);
- *out2 = seg.substr(pos + op.size());
- cout << *out2 << endl;
- return true;
- }