阻塞IO: 当用户发出IO请求之后,内核会去查看数据是否就绪,如果没有就绪系统调用就会等待数据就绪,此时操作系统会将进程从运行队列拿到等待队列,当数据就绪时,操作系统又会将进程从等待队列拿到运行队列,进行数据读取, 所有的套接字, 默认都是阻塞方式.
阻塞IO是最常见的IO模型。
非阻塞IO: 当用户发出IO请求之后,内核会去查看数据是否就绪,如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码.
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用.
信号驱动IO: 当用户发出IO请求之后,内核会去查看数据是否就绪,内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
IO多路转接: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态.
异步IO: 当用户发出IO请求之后,进程立刻就可以开始去做其它的事,内核会将由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).,该也就说该进程完全不需要知道实际的整个IO操作是如何进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。
小结
在这里, 我们要强调几个概念
同步和异步关注的是消息通信机制.
另外, 我们回忆在讲多进程多线程的时候, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不想干的概念.
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
非阻塞IO,纪录锁,系统V流机制, I/O多路转接(也叫I/O多路复用) ,readv和writev函数以及存储映射IO(mmap),这些统称为高级IO.
我们此处重点讨论的是I/O多路转接
fcntl
一个文件描述符, 默认都是阻塞IO.
传入的cmd的值不同, 后面追加的参数也不相同
fcntl函数有5种功能:
我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞.
实现函数SetNoBlock
#include
#include
#include
#include
#include
using namespace std;
void SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
{
perror("tcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int main()
{
SetNonBlock(0); // 非阻式等待
char buffer[100];
while (1)
{
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = '\0';
std::cout << "read success!" << std::endl;
write(1, buffer, strlen(buffer));
}
else
{
// 区分是否是
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
std::cerr << "read failed:数据没有准备好: " << errno << std::endl;
sleep(1);
continue;
}
}
}
return 0;
}
初识select
系统提供select函数来实现多路复用输入/输出模型.
select函数原型
参数解释:
参数timeout取值:
关于fd_set结构
其实这个结构就是一个整数数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图.
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
函数返回值:
错误值可能为:
常见的程序片段如下:
fs_set readset;
FD_SET(fd,&readset);
select(fd+1,&readset,NULL,NULL,NULL);
if(FD_ISSET(fd,readset)){……}
理解select执行过程
理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节, fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd
socket就绪条件
读就绪
写就绪
异常就绪(选学)
select的特点
备注: fd_set的大小可以调整,可能涉及到重新编译内核. 感兴趣的同学可以自己去收集相关资料.
select缺点
样例:
#include
#include
#include
#include
#include
#include "Sock.hpp"
#define NUM (sizeof(fd_set) * 8)
int fd_array[NUM]; // 内容>=0, 合法的fd,如果是-1,该位置没有fd
void Usage(std::string proc)
{
std::cout << "Usage\n\t" << proc << " port" << std::endl;
}
// ./SelectServer 8080
int main(int argc, char *argv[])
{
// std::cout << sizeof(fd_set) * 8 << std::endl;// fd_set的大小
if (argc != 2)
{
Usage(argv[0]);
exit(1);
}
int listen_sock = Sock::Socket();
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
Sock::Bind(listen_sock, port);
Sock::Listen(listen_sock);
// accept: 不应该,accept的本质叫做通过listen_sock获取新链接
// 前提是listen_sock上面有新链接,accept怎么知道有新链接呢??
// 不知道!!!accept阻塞式等待
// 站在多路转接的视角,我们认为,链接到来,对于listen_sock,就是读事件就绪!!!
// 对于所有的服务器,最开始的时候,只有listen_sock
// 事件循环
fd_set fds;
for (int i = 1; i < NUM; ++i)
{
fd_array[i] = -1;
}
fd_array[0] = listen_sock;
int max_fd = listen_sock;
for (;;)
{
FD_ZERO(&fds); //将位图全部清零
for (int i = 0; i < NUM; ++i)
{
// 用户告诉内核,你要帮我关心哪些fd上的读事件已经就绪
if (fd_array[i] == -1)
continue;
FD_SET(fd_array[i], &fds);
if (fd_array[i] > max_fd)
{
max_fd = fd_array[i];
}
}
struct timeval timeout = {5, 0};
// int n = select(max_fd + 1, &fds, nullptr, nullptr, &timeout);
int n = select(max_fd + 1, &fds, nullptr, nullptr, nullptr); // 阻塞式等待多个文件描述符,等待链接
switch (n)
{
case -1:
std::cerr << "select failed!" << std::endl;
break;
// 时间超出
case 0:
std::cout << "select timeout" << std::endl;
break;
default:
std::cout << "select success, 有对应的fd已经就绪了" << std::endl;
// 查找哪些fd就绪了
for (int i = 0; i < NUM; ++i)
{ // for 1
if (fd_array[i] == -1)
continue;
// 判断是否是合法的fd
else if (FD_ISSET(fd_array[i], &fds))
{
if (fd_array[i] == listen_sock)
{
std::cout << "listen_sock:" << listen_sock << " 正在获取新链接" << std::endl;
int new_sock = Sock::Accept(listen_sock);
// 获取成功
// recv,read了呢?绝对不能!
// 新链接到来,不意味着有数据到来!!什么时候数据到来呢?不知道
// 可是,谁可以最清楚的知道那些fd,上面可以读取了?select!
// 无法直接将fd设置进select,但是,好在我们有fd_array[]!
if (new_sock >= 0)
{
std::cout << "listen_sock:" << listen_sock << " 获取新链接成功 new_sock:"
<< new_sock << std::endl;
int pos = 1;
// 将合法的fd设置进fd_array数组
for (; pos < NUM; pos++)
{
if (fd_array[pos] == -1)
break;
}
if (pos < NUM)
{
// 成功找到了一个位置
std::cout << "新链接new_sock:" << new_sock << "已经被添加到fd_array[" << pos << "]中" << std::endl;
fd_array[pos] = new_sock;
}
else
{
std::cout << "服务器已满,关闭套接字" << std::endl;
close(new_sock);
}
}
}
else
{
//正常读取
// 普通的sock,读事件就绪啦!
// 可以进行读取啦,recv,read
// 可是,本次读取就一定能读完吗?读完,就一定没有所谓的数据包粘包问题吗?
// 但是,我们今天没法解决!我们今天没有场景!仅仅用来测试
std::cout << "new_sock: " << fd_array[i] << " 上面有普通读取" << std::endl;
char recvbuffer[1024];
ssize_t s = read(fd_array[i], recvbuffer, sizeof(recvbuffer));
if (s > 0)
{
recvbuffer[s] = '\0';
std::cout << "client " << fd_array[i] << " " << recvbuffer << std::endl;
}
else if (s == 0)
{
std::cout << "client已经关闭,服务端关闭new_sock" << std::endl;
close(fd_array[i]);
fd_array[i] = -1;
}
else
{
std::cout << "读取失败,服务端关闭new_sock" << std::endl;
close(fd_array[i]);
fd_array[i] = -1;
}
}
}
} // end of for 1
break;
}
}
return 0;
}
poll函数接口
参数说明
events和revents的取值:
返回结果
poll的优点
不同与select使用三个位图来表示三个fdset的方式, poll使用一个pollfd的指针实现.
poll的缺点
poll中监听的文件描述符数目增多时
poll示例:
#include
#include
#include
#include
int main()
{
struct pollfd fds;
fds.fd = 0;
fds.events = POLLIN;
fds.revents = 0;//标准输入
int timeout = 1000; // timeout -1 阻塞式等待,0非阻塞式等待
for (;;)
{
int n = poll(&fds, 1, -1);
switch (n)
{
case -1:
std::cerr << "poll error" << std::endl;
break;
case 0:
std::cout << "poll timeout..." << std::endl;
break;
default:
std::cout << "有事件发生" << std::endl;
if(fds.revents & POLLIN)
{
std::cout << fds.fd << "上面的读事件发生了" << std::endl;
char buffer[128];
ssize_t s = read(fds.fd, buffer, sizeof(buffer));
if(s > 0)
{
buffer[s] = '\0';
write(1, buffer, strlen(buffer));
}
}
break;
}
}
return 0;
}
epoll初识
按照man手册的说法: 是为处理大批量句柄而作了改进的poll.
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法
epoll的相关系统调用
epoll 有3个相关的系统调用
epoll_create
创建一个epoll的句柄
epoll_ctl
epoll的事件注册函数
第二个参数的取值:
struct epoll_event结构如下:
events可以是以下几个宏的集合:
epoll_wait
收集在epoll监控的事件中已经发送的事件
epoll工作原理
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
总结一下, epoll的使用过程就是三部曲:
epoll的优点(和 select 的缺点对应)
注意!!
网上有些博客说, epoll中使用了内存映射机制
这种说法是不准确的. 我们定义的struct epoll_event是我们在用户空间中分配好的内存. 势必还是需要将内核的数据拷贝到这个用户空间的内存中的.
请同学们对比总结select, poll, epoll之间的优点和缺点(重要, 面试中常见).
样例:
#include
#include
#include
#include
#include
#include
#include "Sock.hpp"
#define NUM 128
#define SIZE 64
// epoll_server.cc 8080
void Usage(std::string proc)
{
std::cout << "Usage:\n\t" << proc << " port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
return 1;
}
// 创建套接字,等待链接
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
int listen_sock = Sock::Socket();
Sock::Bind(listen_sock, port);
Sock::Listen(listen_sock);
// 创建epoll的句柄
int epfd = epoll_create(NUM);
// epoll的事件注册
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ev);
// 事件循环
struct epoll_event revs[SIZE];
while (true)
{
// 收集在epoll监控的事件中已经发送的事件
int timeout = 1000;
int n = epoll_wait(epfd, revs, SIZE, -1);
switch (n)
{
case -1:
std::cerr << "epoll error" << std::endl;
break;
case 0:
std::cerr << "epoll timeout..." << std::endl;
break;
default:
std::cout << "事件就绪了" << std::endl;
for (int i = 0; i < n; ++i)
{
// 获取已经准备好的 sock
int sock = revs[i].data.fd;
std::cout << "文件描述符: " << sock << " 就绪啦" << std::endl;
if (revs[i].events & EPOLLIN)
{
std::cout << "文件描述符" << sock << " 上面有事件就绪了" << std::endl;
if (sock == listen_sock)
{
int newSock = Sock::Accept(sock);
if (newSock >= 0)
{
std::cout << "获取信新链接成功了" << newSock << std::endl;
// epoll的事件注册,将new_sock添加到epoll模型当中
struct epoll_event _ev;
_ev.events = EPOLLIN;
_ev.data.fd = newSock;
epoll_ctl(epfd, EPOLL_CTL_ADD, newSock, &_ev);
}
// 普通读取
else
{
//do nothing
}
}
else
{
// 进行普通读取
char buffer[1024];
ssize_t s = read(sock, buffer, sizeof(buffer)-1);
if(s < 0)
{
std::cerr << "read failed!" << std::endl;
close(sock);
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
std::cout << "sock: " << sock << "delete from epoll success" << std::endl;
}
else if(s == 0)
{
std::cout << "对端关闭了链接" << std::endl;
close(sock);
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
std::cout << "sock: " << sock << "delete from epoll success" << std::endl;
}
else
{
buffer[s] = '\0';
std::cout << "client #" << buffer << std::endl;
}
}
}
else if (revs[i].events & EPOLLOUT)
{
}
}
break;
}
}
close(listen_sock);
close(epfd);
return 0;
}
epoll工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
假如有这样一个例子:
水平触发Level Triggered 工作模式
epoll默认状态下就是LT工作模式.
边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET
对比LT和ET
LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完.
相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.
另一方面, ET 的代码复杂程度更高了
理解ET模式和非阻塞文件描述符
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 “工程实践” 上的要求.
假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个10k请求.
如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中
此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回
但是问题来了.
而如果是LT没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪.
epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反