I/O处理单元和任务类
前面写了线程池,那么现在要考虑如何去使用该线程池了
注意,到目前为止,我们还是在解决web服务器的I/O处理单元
即负责处理客户连接,读写网络数据的部分
线程池属于 Web 服务器中的工作线程部分,Web 服务器通常使用线程池来管理并复用一组预先创建的工作线程,这些工作线程负责处理客户端的请求。
服务器通常要处理:I/O 事件、信号事件以及定时事件,对应着有两种高效的事件处理模式:Reactor 和 Proactor
同步 I/O 模型通常用于实现 Reactor 模式;
异步 I/O 模型通常用于实现 Proactor 模式;
下面来分别说明一下两种模式
Reactor 模式
Reactor模式是一种基于事件驱动的软件设计模式,用于处理高并发的IO操作。
其核心思想是将I/O操作转换为事件,并使用事件处理器来处理这些事件。
在Reactor模式中,有一个称为Reactor的组件,它负责监听I/O事件,例如连接到达、数据就绪等等。当一个事件发生时,Reactor会将其派发给一个或多个事件处理器来处理。
事件处理器通过回调函数来处理事件,并且通常会使用非阻塞I/O来执行特定的操作。这样,事件处理器可以同时处理多个事件,从而实现高并发性能。Reactor模式被广泛应用于网络编程中,例如UNIX平台上的select/poll/epoll机制,Java NIO框架中的Selector类等等。
工作流程
使用同步 I/O(以 epoll_wait 为例)实现的 Reactor 模式的工作流程是:
- 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时, epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll
内核事件表中注册该 socket 上的写就绪事件。 - 当主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户请求的结果。
特征
Reactor组件通常是一个由事件驱动的循环,它不断监听I/O事件并将其派发给相应的事件处理器来处理。Reactor模式在C++中的实现通常基于操作系统提供的select、poll或epoll等系统调用来实现。
在具体实现上,Reactor组件可以是一个类,它封装了底层的系统调用,并提供了注册、注销事件处理器、以及等待I/O事件到来等方法。当有一个或多个I/O事件就绪时,Reactor对象会返回这些事件的相关信息,例如事件类型、套接字等等。
同时,Reactor组件需要维护一个或多个事件处理器,它们负责处理不同类型的事件。事件处理器通常也是一个类,它封装了特定类型的I/O操作,并实现了相应的回调函数,例如处理连接请求、读取数据、写入数据等等。当Reactor对象收到I/O事件后,它会根据事件类型选择相应的事件处理器,并将事件信息传递给它。
综上, Reactor组件和事件处理器共同实现了Reactor模式
Proactor 模式
Proactor模式是一种并发编程模式,它的目的是实现高效的I/O操作。Proactor 模式将所有 I/O 操作都交给主线程和内核来处理(进行读、写),工作线程仅仅负责业务逻辑。
在Proactor模式中,所有的I/O操作都由操作系统异步执行,并且当I/O操作完成时,操作系统会通知应用程序。这样,应用程序可以继续执行其他任务而不必阻塞在I/O操作上。
在C++中,Proactor模式通常与异步IO(Asynchronous IO)一起使用。在Windows平台上,Proactor模式可以通过使用IOCP(Input/Output Completion Port)来实现。在Linux平台上,Proactor模式可以通过使用epoll或者kqueue等机制来实现。
Proactor模式相对于Reactor模式来说,更加适合处理高并发的网络应用,因为它能够提供更高的吞吐量和更低的延迟。但是,相比于Reactor模式,Proactor模式的实现难度更大,需要更多的代码和调试工作。
工作流程
使用异步 I/O 模型(以 aio_read 和 aio_write 为例)实现的 Proactor 模式的工作流程是:
- 主线程调用 aio_read 函数向内核注册 socket 上的读完成事件,并告诉内核用户读缓冲区的位置,
以及读操作完成时如何通知应用程序(这里以信号为例)。 - 主线程继续处理其他逻辑。
- 当 socket 上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据
已经可用。 - 应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求
后,调用 aio_write 函数向内核注册 socket 上的写完成事件,并告诉内核用户写缓冲区的位置,以
及写操作完成时如何通知应用程序。 - 主线程继续处理其他逻辑。
- 当用户缓冲区的数据被写入 socket 之后,内核将向应用程序发送一个信号,以通知应用程序数据
已经发送完毕。 - 应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭 socket。
特征
在 Linux 中,Proactor 模式可以使用 epoll 或者 libuv 等第三方库来实现。下面分别介绍一下这两种方式的实现方法。
使用 epoll 实现 Proactor 模式
Linux 提供了一个高效的异步 I/O 接口 epoll,可以用于实现 Proactor 模式。使用 epoll 需要遵循以下几个步骤:
- 创建 epoll 句柄:使用
epoll_create
函数创建一个 epoll 句柄; - 添加文件描述符到 epoll 句柄中:使用
epoll_ctl
函数将需要进行异步 I/O 的文件描述符添加到 epoll 句柄中,同时设置好事件类型; - 调用
epoll_wait
函数等待事件:主线程调用epoll_wait
函数等待事件发生,并将发生事件的文件描述符返回给主线程; - 处理事件:对于每一个返回的文件描述符,主线程可以根据其对应的事件类型进行相应的处理操作。
#include
#include
#include
#include
void handleEvent(int fd, uint32_t events)
{
// 处理事件
if (events & EPOLLIN)
{
char buffer[1024];
ssize_t bytesRead = read(fd, buffer, sizeof(buffer));
std::cout << "Read " << bytesRead << " bytes from file" << std::endl;
}
}
int main()
{
int fd = open("myfile.txt", O_RDONLY | O_NONBLOCK);
if (fd < 0)
{
std::cerr << "Failed to open file" << std::endl;
return 1;
}
// 创建 epoll 句柄
int epollFd = epoll_create(1);
if (epollFd < 0)
{
std::cerr << "Failed to create epoll" << std::endl;
return 1;
}
struct epoll_event event = {0};
event.events = EPOLLIN | EPOLLET; // 监听可读事件,并设置边缘触发模式
event.data.fd = fd;
// 添加文件描述符到 epoll 句柄中
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &event) < 0)
{
std::cerr << "Failed to add fd to epoll" << std::endl;
close(fd);
return 1;
}
// 调用 epoll_wait 函数等待事件
while (true)
{
struct epoll_event events[1];
int numEvents = epoll_wait(epollFd, events, sizeof(events) / sizeof(struct epoll_event), -1);
if (numEvents < 0)
{
std::cerr << "Failed to wait for events" << std::endl;
break;
}
for (int i = 0; i < numEvents; ++i)
{
handleEvent(events[i].data.fd, events[i].events);
}
}
close(epollFd);
close(fd);
return 0;
}
在上面的例子中,我们使用 epoll_create
函数创建了一个 epoll 句柄,然后使用 epoll_ctl
函数将文件描述符添加到了 epoll 句柄中,这里监听了可读事件,并设置了边缘触发模式。主线程调用 epoll_wait
函数等待事件的发生,当事件发生时会返回对应的文件描述符,并在回调函数中处理事件。
使用 libuv 实现 Proactor 模式
另外一种常用的第三方库是 libuv,它提供了一系列跨平台的异步 I/O 接口,可以用于实现 Proactor 模式。使用 libuv 需要遵循以下几个步骤:
- 初始化 libuv:使用
uv_loop_init
函数初始化一个 event loop; - 创建一个异步操作对象:例如使用
uv_fs_open
函数创建一个打开文件的异步操作对象; - 注册完成回调函数和错误回调
Q&A
1、从项目的数据流角度比较Reactor模式和Proactor模式的优缺点以及应用场景
Reactor模式:
- 优点:
- 能够处理大量的并发连接。
- 在单线程情况下能够有效地处理多个事件。
- 响应快速,适合处理简单的请求。
- 缺点:
- 可靠性较低,因为一个错误的事件处理程序可能会破坏整个系统。
- 在处理复杂请求时效率相对较低。
Reactor模式适用于需要高并发处理的应用场景,例如Web服务器、消息队列等。
Proactor模式:
- 优点:
- 可以处理复杂的操作,例如文件读写、数据库查询等。
- 在处理大量I/O 事件时效率更高。
- 高可靠性,因为每个事件都由一个独立的线程处理。
- 缺点:
- 处理简单请求时的响应时间相对较慢。
- 需要占用更多的系统资源。
Proactor模式适用于需要处理复杂请求的应用场景,例如数据库、文件存储等。
在数据流处理方面,Reactor模式和Proactor模式的实现方式不同:
Reactor模式:
- 通过一个事件循环(Event Loop)来监听所有I/O事件和定时器事件。
- 当某个I/O事件或定时器事件发生时,事件循环会调用相应的事件处理程序进行处理。
Proactor模式:
- 通过操作系统提供的异步I/O机制来完成I/O操作。
- 在发起一个异步I/O操作后,线程可以继续执行其他任务,等I/O操作完成后,操作系统会通知相应的线程,并执行回调函数进行数据处理。
从实现方式上来说,Proactor模式比Reactor模式更加高效,因为它利用了操作系统的异步I/O机制,可以让线程在I/O操作等待期间执行其他任务,从而提高CPU利用率。但是,在处理简单请求时,Proactor模式会存在一些额外的开销,例如创建多个线程、使用回调函数等。
此外,Reactor模式和Proactor模式的区别还可以从以下几个方面进行考虑:
1、调用方式不同
在Reactor模式中,应用程序需要显式地调用I/O操作,而在Proactor模式中,应用程序只需要将I/O请求发送给操作系统,并在操作完成后收到通知。这使得Proactor模式更加透明,因为它隐藏了底层实现细节。(即Proactor模式隐式调用I/O操作)
2、处理方式不同
在Reactor模式中,事件发生后需要通过事件处理器对事件进行处理,而在Proactor模式中,操作系统会自动处理事件,并将操作结果传递给应用程序。这使得Proactor模式更加高效,因为它避免了事件处理器的开销。
3、数据复制方式不同
在Reactor模式中,当数据准备好时,它需要被复制到应用程序的缓冲区中。
而在Proactor模式中,数据可以直接从操作系统的缓冲区中读取,从而避免了数据复制的开销。
4、错误处理方式不同
在Reactor模式中,当出现错误时,应用程序需要及时处理并进行适当的回退操作。
而在Proactor模式中,操作系统会自动处理错误,并提供相应的错误码,使应用程序能够更好地处理异常情况。
5、实现细节不同
在Reactor模式中,事件处理器需要使用复杂的状态机来管理事件的生命周期。
而在Proactor模式中,操作系统提供了一套完整的异步I/O框架,使得应用程序能够更轻松地实现高效的异步I/O。
2、Reactor模式和Proactor模式与阻塞/非阻塞、同步/异步I/O
在阻塞I/O情况下,使用Reactor模式可以让程序发生阻塞,等待数据准备好后再进行处理。
在此期间,程序不能执行其他任务。
相反,使用Proactor模式,程序将无需阻塞等待数据准备好,而是通过回调函数等方式来通知应用程序数据已经准备好了,这样应用程序就可以继续执行其他任务。
在非阻塞I/O情况下,使用Reactor模式需要轮询I/O操作是否完成,如果完成则进行相应的处理;
而Proactor模式则利用系统提供的通知机制来异步地处理I/O操作完成事件,从而避免了轮询的开销。
在同步I/O情况下,Reactor模式适合于处理少量连接、并发度低的场景,因为每个连接都需要消耗一个线程,对于大量连接的情况,会导致资源浪费。Proactor模式则更适合高并发的情况,因为它可以异步地处理多个I/O事件,而不会创建过多线程。
在异步I/O情况下,Reactor模式仍然需要开启线程进行异步处理,但这种线程池的方式相对于同步I/O情况下的线程池规模要小得多。
而Proactor模式则利用系统提供的异步I/O机制,在完成I/O操作之后,通过回调函数等方式来通知应用程序进行处理,从而避免了大量线程的开销。
综上所述,使用Reactor和Proactor模式都可以实现高效的I/O事件处理,但在不同的场景中需要选择合适的模式来提高性能。
3、为什么主流服务器多采用Reactor模式?
主要是因为Reactor模式在处理高并发情况下的性能更加出色。
在Reactor模式中,事件的响应和处理是由应用程序来完成的,当有大量请求时,这种方式能够更好地利用CPU资源,提高系统的效率。
另外,Reactor模式更容易实现,并且可以适用于各种不同类型的应用程序。相比之下,Proactor模式需要操作系统完成事件的响应和处理,这会增加系统的开销,特别是在高负载情况下。同时,Proactor模式需要对应用程序进行大量的重构,以适应异步I/O的编程模型,这也增加了开发成本和难度。
基于Reactor模式实现I/O处理单元
在写I/O处理单元之前我觉得有必要弄清楚工程中有哪些文件,以及每个文件是干什么的
目前,项目的结构如下(后续会更新)
root@ubuntu:/home/ag/webserver1.5# tree
.
├── a.out //你应该知道的
├── http_conn.cpp //存放着用于处理http响应的功能函数
├── http_conn.h //存放着一堆http_conn.cpp中函数的声明以及一些静态变量和宏定义
├── locker.h //有三个类互斥锁类locker、条件变量类cond、信号量类sem,库pthread和semaphore的封装
├── main.cpp //主文件,实现http服务器的核心功能(基于Reactor模式设计)
├── noactive
│ ├── a.out
│ ├── lst_timer.h
│ └── nonactive_conn.cpp
├── resources //存放静态页面资源,供客户端请求调用
│ ├── images
│ │ └── image1.jpg
│ └── index.html
└── threadpool.h //线程池,用于处理和响应http请求报文
注:
1、这里只是先对每个文件的作用进行大致说明
2、请先忽略noactive文件夹
上一篇我们讨论了什么是线程池以及如何实现一个线程池,即完成了对threadpool.h和locker.h文件的编写
根据本篇前面对于Reactor模式和Proactor模式的讨论,至少现在有一个共识:线程池是实现Reactor模式的一个组件
从目录树中可知,main.cpp是我们这个服务器工程的核心文件(使用socket实现服务器功能),我们通过组织和设计来调用socketAPI进而实现了服务器的基本功能。具体的设计模式就是Reactor模式
使用Reactor模式就意味着我们的代码中需要实现Reactor组件和事件处理器,一个一个来讲
Reactor组件
使用同步I/O时,所谓的"Reactor组件"就是一个死循环配合阻塞函数,不断地检测文件描述符中是否有新的事件发生,一旦有则进行相关处理
实际上,这也是最常规的基于socketAPI编写服务器的形式
初始化线程池
先把线程池初始化一下,之后有用
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
if(argc <= 1){
printf("按照如下格式运行: %s port_number\n", basename(argv[0]));
exit(-1);
}
//创建线程池,并初始化
//来一个任务之后,要封装成一个任务对象,交给线程池去处理
threadpool* pool = NULL;
try{
pool = new threadpool;
}catch(...){
exit(-1);
}
//创建一个数组用于保存所有的客户端信息
//每当有新连接进来时,都会在 users 数组中找到一个未使用的 http_conn 对象,进行初始化并保存该连接对应的信息
http_conn* users = new http_conn[MAX_FD];
}
在初始化线程池时,输入的参数是http_conn对象,这个就是用于处理任务的任务类,或者我们称为事件处理器,其实现后面会介绍
socket通信
在main.cpp中,我们需要写一个基于Reactor模式的socket通信代码
Socket 是一种用于在网络上进行通信的编程接口(API)。它允许服务器应用程序通过与客户端建立连接来接收和发送数据。当一个客户端尝试连接到服务器时,服务器应用程序会创建一个 Socket 对象,该对象可用于在服务器和客户端之间传输数据。服务器应用程序可以通过读取和写入 Socket 对象来监听来自客户端的请求并向客户端返回响应。
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
if(argc <= 1){
printf("按照如下格式运行: %s port_number\n", basename(argv[0]));
exit(-1);
}
//获取端口号,转换成整数
int port = atoi(argv[1]);
//使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
int listenfd = socket(PF_INET, SOCK_STREAM, 0);//创建用于监听的socket文件描述符
int reuse = 1;//设置端口复用
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));//让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
//存放服务器的地址信息
struct sockaddr_in address;
address.sin_family = AF_INET;//使用IPv4协议
address.sin_addr.s_addr = INADDR_ANY;//监听所有网卡的连接请求
address.sin_port = htons(port);//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
bind(listenfd, (struct sockaddr*)&address, sizeof(address));//绑定服务器的地址信息
listen(listenfd, 5);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
//将监听的文件描述符添加到epoll对象中
addfd(epollfd, listenfd, false);
http_conn::m_epollfd = epollfd;//赋值
}
这里我们需要把监听的文件描述符 listenfd 添加到 epoll 对象中,即将它加入到内核事件表中。
需要使用一个自定义函数addfd,由于还没有实现,先将其声明出来
//添加文件描述符到epoll中
extern void addfd(int epollfd, int fd, bool one_shot);
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
if(argc <= 1){
printf("按照如下格式运行: %s port_number\n", basename(argv[0]));
exit(-1);
}
//获取端口号,转换成整数
int port = atoi(argv[1]);
//使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
int listenfd = socket(PF_INET, SOCK_STREAM, 0);//创建用于监听的socket文件描述符
int reuse = 1;//设置端口复用
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));//让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
//存放服务器的地址信息
struct sockaddr_in address;
address.sin_family = AF_INET;//使用IPv4协议
address.sin_addr.s_addr = INADDR_ANY;//监听所有网卡的连接请求
address.sin_port = htons(port);//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
bind(listenfd, (struct sockaddr*)&address, sizeof(address));//绑定服务器的地址信息
listen(listenfd, 5);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
//将监听的文件描述符添加到epoll对象中
addfd(epollfd, listenfd, false);
http_conn::m_epollfd = epollfd;//赋值
}
下面就要开始编写Reactor组件,前面我们提到过,该组件实际上就是一个while死循环
//添加文件描述符到epoll中
extern void addfd(int epollfd, int fd, bool one_shot);
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
...
//获取端口号,转换成整数
...
//使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
...
//存放服务器的地址信息
...
//将监听的文件描述符添加到epoll对象中
...
//Reactor组件
while(true){
/*
1、阻塞等待文件描述符监听到的事件
2、遍历事件数组,判断事件类型,进行对应处理
*/
}
}
在该循环中,使用epoll_wait获取监听socket的文件描述符所返回的事件数量
这里需要获取文件描述符监听到的所有事件,然后处理所有事件
这是符合Reactor模式的要求的
下面是具体实现
//添加文件描述符到epoll中
extern void addfd(int epollfd, int fd, bool one_shot);
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
...
//获取端口号,转换成整数
...
//使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
...
//存放服务器的地址信息
...
//将监听的文件描述符添加到epoll对象中
...
//Reactor组件
while(true){//死循环不断检测有无事件发生
//具体来说就是使用epoll_wait获取监听socket的文件描述符所返回的事件数量
int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(num < 0 && errno != EINTR){
printf("epoll failure\n");
break;
}
//循环遍历事件数组
for(int i = 0; i < num; i++){
int sockfd = events[i].data.fd;
if(sockfd == listenfd){
//有客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlen = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
if(http_conn::m_user_count >= MAX_FD){
//目前连接满了
printf("服务器正忙...\n");
close(connfd);
continue;
}
//将新的客户的数据初始化,放到数组中
users[connfd].init(connfd, client_address);
}else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//对方异常断开或错误异常
users[sockfd].close_conn();
}else if(events[i].events & EPOLLIN){
//判断是否有读事件发生
if(users[sockfd].read()){//一次性读出数据, read()
//成功读完后要交给工作线程处理
//调用线程池,追加任务
//线程池执行run函数,不断从队列去取
//取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);
}else{//读失败,关闭
users[sockfd].close_conn();
}
}else if(events[i].events & EPOLLOUT){
if(!users[sockfd].write()){
users[sockfd].close_conn();
}
}
}
}
}
在死循环中(也就是Reactor组件),epoll_wait() 函数不断地检测文件描述符epollfd上是否有 I/O 事件发生。当有可读或可写事件发生时,epoll_wait() 函数会返回一个 events 数组,并将其中的事件信息填充到数组中,遍历数组中的所有事件,根据事件类型进行相应的处理。
- epoll_wait是一个系统调用函数,用于等待文件描述符上的I/O事件;
- epollfd是通过epoll_create函数创建的epoll实例的文件描述符,它用于管理需要监视的文件描述符集合;
- listenfd是服务器应用程序使用的套接字文件描述符,它与epollfd关联,并使用epoll_ctl函数将其添加到epollfd所管理的文件描述符集合中。
epollfd代表了一个epoll实例,负责管理需要监视的文件描述符集合,而listenfd则是需要被监视的文件描述符之一,它被添加到epollfd所管理的文件描述符集合中,以便在有新的客户端连接请求时能够及时通知服务器程序。当epoll_wait函数返回时,它会将事件列表填入events数组中,告诉服务器哪些文件描述符发生了I/O事件,然后服务器应用程序根据这些事件来执行相应的操作。
遍历事件数组时,我们需要处理以下几种事件(情况):
1、listenfd有读事件发生
表示有新的客户端连接请求,需要通过accept函数接受客户端连接,并将新的客户端数据初始化并存储到http_conn数组中。
if(sockfd == listenfd){
//有客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlen = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
if(http_conn::m_user_count >= MAX_FD){
//目前连接满了
printf("服务器正忙...\n");
close(connfd);
continue;
}
//将新的客户的数据初始化,放到数组中
users[connfd].init(connfd, client_address);
}
在服务器端监听到新的客户端连接时,使用accept函数接受客户端的连接请求,并获取客户端的IP地址和端口号等信息。其中,client_address是一个sockaddr_in类型的结构体变量,用于存储客户端的IP地址和端口号等信息。在accept函数中,通过传递参数(struct sockaddr*)&client_address及其长度,接收客户端的信息,并将其保存到client_address结构体中。这里的目的就是为了获取客户端的IP地址和端口号等信息,方便后续与客户端进行通信。
2、sockfd的EPOLLRDHUP、EPOLLHUP或EPOLLERR事件
表示对方异常断开或出现错误异常,需要关闭该链接,并将http_conn数组中该客户端的状态设置为关闭。
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//对方异常断开或错误异常
users[sockfd].close_conn();
}
3、sockfd有读事件发生
调用http_conn类的read()函数一次性读取完所有请求数据,如果读取成功则将该任务对象添加到线程池的任务队列中,否则关闭该链接。
else if(events[i].events & EPOLLIN){
//判断是否有读事件发生
if(users[sockfd].read()){//一次性读出数据, read()
//成功读完后要交给工作线程处理
//调用线程池,追加任务
//线程池执行run函数,不断从队列去取
//取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);
}else{//读失败,关闭
users[sockfd].close_conn();
}
}
4、sockfd有写事件发生
调用http_conn类的write()函数将响应数据发送给客户端,如果发送成功则继续等待下一个写事件发生,否则关闭该链接。
else if(events[i].events & EPOLLOUT){
if(!users[sockfd].write()){
users[sockfd].close_conn();
}
完整代码
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "locker.h"
#include "threadpool.h"
#include
#include "http_conn.h"
#define MAX_FD 65535 //最大的文件描述符个数
#define MAX_EVENT_NUMBER 10000 //一次监听的最大事件数
//添加信号捕捉
void addsig(int sig, void(handler)(int)){//信号处理函数
struct sigaction sa;//创建信号量
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
sigfillset(&sa.sa_mask);//设置信号临时阻塞等级
sigaction(sig, &sa, NULL);//注册信号
}
//模拟proactor模式,主线程监听事件
//当有读事件产生,在主线程中一次性读出来,封装成一个任务对象(用任务类)
//然后交给子线程(线程池队列中的工作线程),线程池再去取任务做任务
//添加文件描述符到epoll中
extern void addfd(int epollfd, int fd, bool one_shot);
//从epoll删除文件描述符
extern void removefd(int epollfd, int fd);
//修改文件描述符
extern void modfd(int epollfd, int fd, int ev);
int main(int argc, char* argv[]){
//判断参数个数,至少要传递一个端口号
if(argc <= 1){
printf("按照如下格式运行: %s port_number\n", basename(argv[0]));
exit(-1);
}
//获取端口号,转换成整数
int port = atoi(argv[1]);
//对SIGPIPE信号进行处理
addsig(SIGPIPE, SIG_IGN);
//创建线程池,并初始化
//任务类:http_conn
//来一个任务之后,要封装成一个任务对象,交给线程池去处理
threadpool* pool = NULL;
try{
pool = new threadpool;
}catch(...){
exit(-1);
}
//创建一个数组用于保存所有的客户端信息
//users 数组是一个存储 http_conn 对象的数组,每个 http_conn 对象代表一个客户端连接。
http_conn* users = new http_conn[MAX_FD];
//写网路通信的代码
//创建监听的套接字
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
//tcp服务端代码
//设置端口复用(一定要在绑定之前设置)
int reuse = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
//绑定
//绑定的作用是将服务器的端口号和 IP 地址与一个套接字绑定,
//使得客户端可以通过相应的 IP 地址和端口号访问到服务器。
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);//大端小端转换为网络字节序
bind(listenfd, (struct sockaddr*)&address, sizeof(address));
//监听
//将listenfd这个socket的状态设置为监听状态,等待客户端的连接请求。
listen(listenfd, 5);
//listenfd会触发一个可读事件,从而被加入到epoll对象中,并由events数组保存。
// 创建epoll对象,事件数组,添加监听的文件描述符
// events 数组的作用是存储 epoll_wait() 函数返回的事件
epoll_event events[MAX_EVENT_NUMBER];
// 用于事件管理的epoll对象的文件描述符
int epollfd = epoll_create(5);//创建epoll对象
//将监听的文件描述符添加到epoll对象中
addfd(epollfd, listenfd, false);
http_conn::m_epollfd = epollfd;//赋值
while(true){//主线程不断循环检测有无事件发生
/*epoll_wait()会等待事件的发生,一旦事件发生,
会将该事件的相关信息存储到events数组中,
主线程会遍历该数组并处理所有发生的事件。*/
//num代表检测到几个事件
//调用 epoll_wait() 函数时,我们需要将一个用于存储事件的数组传递给该函数,
//函数会将检测到的事件存储到该数组中。
//遍历该数组可以获取到每个事件对应的文件描述符以及该事件所对应的事件类型,根据不同的事件类型,我们可以采取不同的处理方式。
int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(num < 0 && errno != EINTR){
printf("epoll failure\n");
break;
}
//循环遍历事件数组
//注意,此时num>0,意味着事件数组events中肯定有元素在,即检测到有事件发生
for(int i = 0; i < num; i++){
//从事件数组中取出epoll_wait()检测到的事件,即客户端的连接请求
/*当events中存储的事件为sockfd可读事件时,表示该socket有数据可读,
此时应该将读事件交由工作线程去处理。
当events中存储的事件为sockfd可写事件时,表示该socket可以写入数据,
此时应该将写事件交由工作线程去处理*/
int sockfd = events[i].data.fd;//sockfd只是一个名称,表示由epoll_wait()等待并检测到的事件
/*在服务器中,通常会使用一个监听socket(listenfd)来接受客户端的连接请求,
当有新的客户端连接到来时,服务器会使用accept函数创建一个新的连接socket(connfd),
这个新的socket会与客户端的socket建立起通信连接。*/
if(sockfd == listenfd){
//有新的客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlen = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
if(http_conn::m_user_count >= MAX_FD){
//目前连接满了
printf("服务器正忙...\n");
close(connfd);
continue;
}
//将新的客户的数据初始化,放到数组中
/*每当有一个新的客户端连接请求到来时,
服务器会创建一个新的 http_conn 对象,并将该对象添加到 users 数组中,
以管理这个客户端连接。*/
users[connfd].init(connfd, client_address);
}else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//对方异常断开或错误异常
users[sockfd].close_conn();
}else if(events[i].events & EPOLLIN){
//判断是否有读事件发生
if(users[sockfd].read()){//一次性读出数据, read()
//成功读完后要交给工作线程处理
//调用线程池,追加任务
//线程池执行run函数,不断从队列去取
//取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);//将 users + sockfd 所指向的 http_conn 对象追加到线程池的任务队列中。
/*users 数组中的每个元素都代表一个客户端连接,
数组的下标是该客户端的文件描述符 fd。
users + sockfd 就是获取到该客户端连接的 http_conn 对象的指针。
然后将该指针作为参数,调用线程池对象的 append 函数,
将该指针所指向的 http_conn 对象添加到线程池的任务队列中,
等待线程池的工作线程来处理。*/
}else{//读失败,关闭
users[sockfd].close_conn();
}
}else if(events[i].events & EPOLLOUT){
if(!users[sockfd].write()){
users[sockfd].close_conn();
}
}
}
}
close(epollfd);
close(listenfd);
delete[] users;
delete pool;
return 0;
}
事件处理器
在讨论遍历事件数组并对事件进行相应处理前,需要先明确一下epoll_wait()是如何利用listenfd的,这有助于理解事件处理机制
epoll_wait() 函数没有直接使用 listenfd 进行监听,而是使用了 events 数组来保存事件。
epoll_ctl() 函数(位于addfd函数中)将 listenfd 加入到 epoll 实例的监听队列之后,它会自动返回 EPOLLIN 事件,表示这个套接字已经准备好可以进行读操作,因此在 events 数组中添加了一个 epoll_event 结构体,其中包含了 EPOLLIN 事件和 listenfd 的文件描述符。
当有新的客户端连接请求时,内核会检测到 listenfd 文件描述符上的 EPOLLIN 事件,并将其加入到 events 数组中。随后,在调用 epoll_wait() 函数时,会检测到 events 数组中的 EPOLLIN 事件,并返回对应的文件描述符 connfd,即新建立的客户端连接套接字。
所以说,虽然 epoll_wait() 函数没有直接利用 listenfd 进行监听,但是它通过事件机制实现了对 listenfd 的间接监听,并且可以准确地处理新的客户端连接请求。
现在开始实现事件处理器,也就是工程中的http_conn.cpp和http_conn.h
本质上讲,http_conn.cpp中就是定义一堆在主函数main中会使用到的函数和参数变量
在进入Reactor组件的循环之前我们对线程池进行了初始化,将事件处理器对象(http_conn*)作为参数输入到线程池中
并且创建了一个users数组(http_conn* users)用于保存所有客户端信息,其中每个元素就是一个http_conn*,代表一个客户端连接
鉴于此,我们应该先关注事件处理器的初始化部分
初始化
// 初始化连接,外部调用初始化套接字地址
void http_conn::init(int sockfd, const sockaddr_in& addr){
m_sockfd = sockfd;
m_address = addr;
// 端口复用
int reuse = 1;
setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
addfd(m_epollfd, sockfd, true);//添加到epoll对象中
m_user_count++;//http_conn对象初始化后,总用户数加1
init();//为什么要分开?因为下面的init中的信息后面还要单独初始化使用
}
void http_conn::init(){
bytes_to_send = 0;
bytes_have_send = 0;
m_check_state = CHECK_STATE_REQUESTLINE; // 初始状态为检查请求行
m_linger = false; // 默认不保持链接 Connection : keep-alive保持连接
m_method = GET; // 默认请求方式为GET
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
bzero(m_read_buf, READ_BUFFER_SIZE);//把m_read_buf全置为0
bzero(m_write_buf, READ_BUFFER_SIZE);
bzero(m_real_file, FILENAME_LEN);
}
初始化函数可以提供两个,一个接收初始化连接时的文件描述符作为参数,另一个函数是无参数的重载版本,仅用于初始化参数
事件处理
在介绍 Reactor组件 的编写时,主循环中主要负责处理4种事件,具体落实处理操作的函数均定义在事件处理器中
下面就按照4种事件对事件处理器进行拆解分析,可以对照之前的来看
处理新客户端的连接请求
也就是listenfd有读事件发生呗
if(sockfd == listenfd){
//有新的客户端连接进来
}
在网络编程中,每个连接都会被分配一个唯一的标识符,这个标识符通常是一个整数,也就是套接字描述符(socket descriptor)。当服务器收到客户端的连接请求后,会创建一个新的套接字,生成一个新的套接字描述符,该描述符作为客户端连接的标识符。
//有新的客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlen = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
此时我们已经通过accept函数接受客户端的连接请求,并返回一个文件描述符connfd。connfd
是一个整数类型的变量,它表示新创建的套接字描述符。该描述符用于建立与客户端之间的数据通信连接。
具体来说,当服务器程序调用accept函数时,它会从等待队列中取出一个已经完成三次握手协议的客户端连接请求,并创建一个新的套接字描述符以用于与该客户端进行通信。这个新的套接字描述符(也就是connfd
)可以视为服务器与客户端之间的一条虚拟电缆,通过它可以进行双向数据传输。服务器程序可以使用该描述符向客户端发送数据,也可以从该描述符读取客户端发送来的数据。
在程序开始运行时,主线程首先创建了一个 epoll 对象,并将监听套接字(
listenfd
)添加到该对象中。当有新的客户端连接请求到达服务器时,内核会将连接请求加入到未完成连接队列中,然后向客户端发送 SYN-ACK 响应包并等待客户端的 ACK 确认包。当客户端发送 ACK 确认包到达服务器时,内核将连接从未完成连接队列中移动到已完成连接队列中,此时主线程通过调用epoll_wait()
函数检测到了该事件,并调用accept()
函数从已完成连接队列中获取到了连接请求。因此,在我们的代码中,accept()
函数是通过监听套接字所在的 epoll 对象来获取到已完成连接队列中的客户端连接请求的。需要注意的是,未完成连接队列和已完成连接队列都是内核维护的队列,程序无法直接访问它们。而我们可以通过设置监听套接字和调用
accept()
函数来与这些队列进行交互。
users[connfd].init(connfd, client_address);
users
数组存储了所有已经连接的客户端信息。users[i]
表示第 i
个客户端的信息。其中,connfd
是当前发生事件的客户端套接字描述符,我们可以通过遍历 users
数组找到对应的客户端信息,然后进行相应的处理。具体来说,我们可以将 connfd
和 users
中的所有元素的 fd
进行比较,找到连接对应的客户端信息,并调用事件处理器中的初始化函数对该http_conn对象进行初始化。
处理断开连接等错误异常
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//对方异常断开或错误异常
users[sockfd].close_conn();
}
events[i]
是一个epoll_event
结构体,它的events
成员变量存储着当前的事件类型,例如EPOLLIN、EPOLLOUT、 EPOLLRDHUP等
events[i].events
表示获取第i
个事件对象的事件类型集合
该情况中,如果发生了EPOLLRDHUP、EPOLLHUP或EPOLLERR事件,就执行对应的操作
EPOLLRDHUP表示TCP连接的远程端关闭或半关闭连接,即对方关闭了socket连接或者shutdown写操作。
EPOLLHUP表示挂起的连接或监听套接字已经关闭。它可能是一个错误,也可能是一个正常情况,因为它只代表文件描述符不再可用,而不是一定有错误。
EPOLLERR表示错误事件。例如:socket被对端重置(rst);对于udp的epoll来说,他可以支持多个端口绑定,当然你不能bind两次同一个端口,那么第二次就会返回-1并且errno会被设置为EADDRINUSE;还有就是当读取时没有数据则返回-1并且errno被设置为EAGAIN 。
读事件
读事件是指读取客户端发送到服务器的数据,也就是尝试对监听的文件描述符进行读取
else if(events[i].events & EPOLLIN){
//判断是否有读事件发生
if(users[sockfd].read()){//一次性读出数据, read()
//成功读完后要交给工作线程处理
//调用线程池,追加任务
//线程池执行run函数,不断从队列去取
//取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);//将 users + sockfd 所指向的 http_conn 对象追加到线程池的任务队列中。
/*users 数组中的每个元素都代表一个客户端连接,
数组的下标是该客户端的文件描述符 fd。
users + sockfd 就是获取到该客户端连接的 http_conn 对象的指针。
然后将该指针作为参数,调用线程池对象的 append 函数,
将该指针所指向的 http_conn 对象添加到线程池的任务队列中,
等待线程池的工作线程来处理。*/
}else{//读失败,关闭
users[sockfd].close_conn();
}
}
事件为EPOLLIN代表文件描述符可用,尝试对其进行读取
此时需要调用事件处理器中定义的bool http_conn::read()
http_conn::read(),读取数据到缓冲区
// 循环读取客户数据,直到无数据可读或者对方关闭连接
bool http_conn::read(){
// printf("一次性读完数据\n");
if(m_read_idx >= READ_BUFFER_SIZE){
return false;
}
int bytes_read = 0;//读取到的字节
while(true){//开始保存数据
// 数组起始位置(0)+已经读的数据位置(100)=下次开始写的位置(101)
// 从m_read_buf+ m_read_idx索引出开始保存数据,大小是READ_BUFFER_SIZE - m_read_idx
bytes_read = recv(m_sockfd, m_read_buf+ m_read_idx,
READ_BUFFER_SIZE - m_read_idx, 0);
if(bytes_read == -1){
if(errno == EAGAIN || errno == EWOULDBLOCK){
// 没有数据
break;
}
return false;
} else if(bytes_read == 0){ // 对方关闭连接
return false;
}
m_read_idx += bytes_read;//读到数据,更新索引
}
// printf("读到了数据:%s\n", m_read_buf);
return true;
}
该函数会从对应文件描述符(即m_sockfd)上读取数据,一次性读取尽可能多的数据,并将读取到的数据保存到m_read_buf中,返回值表示是否成功读取数据。
首先判断m_read_idx是否越界,若已经超过了缓冲区大小,则返回false;然后使用while循环不断读取数据,每次最多读取READ_BUFFER_SIZE - m_read_idx个字节,并将读取到的数据保存到m_read_buf+ m_read_idx位置处;如果读取失败,则判断错误码是否为EAGAIN或EWOULDBLOCK,如果是则表示暂时没有更多的数据可读,直接跳出循环,并返回true;如果是其他错误码或者读取到的数据长度为0,则表示连接出错或对方关闭了连接,直接返回false。最后如果成功读取到数据,则更新m_read_idx的值。
1、数据保存到m_read_buf + m_read_idx位置处是因为该位置代表了当前读取缓冲区的未被使用的空间。每次从套接字接收数据时,将数据存储到该位置,并将已读取数据的长度增加(bytes_read),更新m_read_idx的值,以便下一次接收数据时可以从新的位置开始存储数据。
这种方法的优点是,可以避免在读取缓冲区中重复使用已经读取过的数据,从而保证数据的完整性和正确性。
2、recv()是一个系统调用,用于从指定的套接字上接收数据。它的作用是从网络中读取一定长度的数据,并将其放入缓冲区中以供进一步处理。
其中,参数意义如下:
- sockfd:需要接收数据的套接字描述符。
- buf:接收数据缓冲区的地址。
- len:缓冲区的长度。
- flags:接收数据时的可选参数。
3、注意,read()函数是在事件处理器http_conn中定义的,因此读取到的数据保存在http_conn对象的缓冲区中
将数据读取到缓冲区之后,此时的http_conn对象就具有了完整的信息,主线程会将这个封装好的http_conn对象作为参数,调用线程池的append()函数,往任务队列中追加任务。
pool->append(users + sockfd);
线程池中的工作线程会不断从任务队列中取出任务,取到后就执行run()函数(没错,线程池在这里被用上了)
这里的run()函数会调用http_conn对象的process()函数来处理业务逻辑,例如解析HTTP请求、生成HTTP响应等操作。
http_conn::process(),解析请求
http_conn::process()
函数的定义很简单
// 由线程池中的工作线程调用,这是处理HTTP请求的入口函数
void http_conn::process(){
// 解析HTTP请求
HTTP_CODE read_ret = process_read();
if(read_ret == NO_REQUEST){
modfd(m_epollfd, m_sockfd, EPOLLIN);
return;
}
// 生成响应
bool write_ret = process_write(read_ret);
if(!write_ret){
close_conn();
}
modfd(m_epollfd, m_sockfd, EPOLLOUT);
}
这里会去解析具体的HTTP请求并生成响应
HTTP_CODE是在http_conn.h中定义的一个枚举类型,代表了服务器在解析HTTP请求后应该提供的返回值
enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION };
下面来单独说一下解析请求的函数process_read()
主从状态机
http_conn::process_read()
是一个HTTP请求处理函数,用于解析HTTP请求报文。主要功能如下:
- 解析一行数据,得到不同状态(包括请求行、请求头和请求体)。
- 根据不同的状态,解析请求行、请求头或请求体,并进行相应的处理。
- 对于GET请求,根据具体的请求信息进行预处理,分析目标文件属性,并将其映射到内存地址m_file_address处。
- 对于POST请求,解析请求数据并进行相应的处理。
- 返回处理结果,包括成功、失败或请求不完整等状态码。
http_conn::HTTP_CODE http_conn::process_read(){
//定义初始状态
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char* text = 0;
//解析一行数据,得到不同状态
//OK表示正常
//主状态机 && 从状态机 || 解析到一行完整数据(或者请求体)
//这里的主状态机指的是process_read()函数,从状态机指的是parse_line()
while (((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK))
|| ((line_status = parse_line()) == LINE_OK)){
// 获取一行数据
text = get_line();
m_start_line = m_checked_idx;
printf("获取到一行http数据: %s\n", text);
switch (m_check_state){
case CHECK_STATE_REQUESTLINE: {
//解析请求行,也就是GET中的GET
/*通过请求行的解析我们可以判断该HTTP请求的类型(GET/POST),
而请求行中最重要的部分就是URL部分,
我们会将这部分保存下来用于后面的生成HTTP响应*/
ret = parse_request_line(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
break;//正常解析就break
}
case CHECK_STATE_HEADER: {
ret = parse_headers(text);//解析请求头,GET和POST中空行以上,请求行以下的部分
if(ret == BAD_REQUEST){
return BAD_REQUEST;
} else if(ret == GET_REQUEST){//遇到换行符就默认你解析完请求头,不管后面还有没有内容
return do_request();//解析具体的请求信息
}
break;
}
case CHECK_STATE_CONTENT: {
/*解析请求数据,对于GET来说这部分是空的,
因为这部分内容已经以明文的方式包含在了请求行中的URL部分了;
只有POST的这部分是有数据的*/
ret = parse_content(text);
if(ret == GET_REQUEST){
/*do_request()需要做:
需要首先对GET请求和不同POST请求
(登录,注册,请求图片,视频等等)做不同的预处理,
然后分析目标文件的属性,若目标文件存在、对所有用户可读且不是目录时,
则使用mmap将其映射到内存地址m_file_address处,并告诉调用者获取文件成功。*/
return do_request();
}
line_status = LINE_OPEN;
break;
}
default: {
return INTERNAL_ERROR;
}
}
}
return NO_REQUEST;//主状态机请求不完整
}
这里要单独说一下主从状态机模式,http_conn::process_read()
就是根据这个模式设计的,用以应对不同的状态处理
在主从状态机模式中,一个状态机作为主要的控制器,而其他状态机则被设计为从状态机。主状态机负责协调和管理所有子状态机,并将它们之间的通信和事件处理进行协调。通常,主状态机是在系统启动时创建的,而从状态机则可以在系统运行时动态创建、注册或注销。
主状态机 process_read()
中进行循环,直到解析完整个 HTTP 请求数据。
具体地说,(line_status = parse_line())
表示调用从状态机 parse_line()
,并将返回值赋给变量 line_status
。如果解析成功,则 line_status
的值为 LINE_OK
,否则为其他值(如 LINE_OPEN
、LINE_BAD
等)。
而 m_check_state
表示当前 HTTP 请求的解析状态,有三种可能:CHECK_STATE_REQUESTLINE
表示正在解析请求行,CHECK_STATE_HEADER
表示正在解析请求头,CHECK_STATE_CONTENT
表示正在解析请求体。因此 m_check_state == CHECK_STATE_CONTENT
表示当前正在解析请求体。
当且仅当解析到请求体时,才需要进一步判断是否已经解析完整个 HTTP 请求数据。
因此 (m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK)
表示如果当前正在解析请求体,并且从状态机已经成功解析了一行请求数据,则需要继续解析下一行请求数据。
而 while (((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK)) || ...)
表示只要解析状态还处于请求体状态且从状态机已经成功解析出一行请求数据,或者从状态机还未成功解析出一行完整的请求数据,就需要不断循环调用从状态机、解析请求数据,直到解析完成整个 HTTP 请求数据。
之后的工作就是根据对应状态,编写处理函数,这部分都类似就没什么好说的了,主要就是一些文本解析的工作
写事件
写事件是指服务器向客户端“写”数据,也就是服务器向客户端发送数据(即 HTTP 响应报文)
这主要依靠http_conn::write()
来实现
// 写HTTP响应
bool http_conn::write(){
int temp = 0;
if(bytes_to_send == 0){
// 将要发送的字节为0,这一次响应结束。
modfd(m_epollfd, m_sockfd, EPOLLIN);
init();
return true;
}
while(1){
// 分散写
temp = writev(m_sockfd, m_iv, m_iv_count);
if(temp <= -1){
// 如果TCP写缓冲没有空间,则等待下一轮EPOLLOUT事件,虽然在此期间,
// 服务器无法立即接收到同一客户的下一个请求,但可以保证连接的完整性。
if(errno == EAGAIN){
modfd(m_epollfd, m_sockfd, EPOLLOUT);
return true;
}
unmap();
return false;
}
bytes_have_send += temp;//已发送的字节数
bytes_to_send -= temp;//剩余需要发送的字节数
if(bytes_have_send >= m_iv[0].iov_len){
m_iv[0].iov_len = 0;
m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
m_iv[1].iov_len = bytes_to_send;
}
else{
m_iv[0].iov_base = m_write_buf+ bytes_have_send;
m_iv[0].iov_len = m_iv[0].iov_len - temp;
}
if(bytes_to_send <= 0){
// 没有数据要发送了
unmap();
modfd(m_epollfd, m_sockfd, EPOLLIN);
if(m_linger){
init();
return true;
}
else{
return false;
}
}
}
}
该函数实现了分散写的功能,也就是把要发送的数据分为多个部分进行传输
首先,在函数开始时,会判断是否存在字节需要发送,如果没有,则结束此次响应。
接着,通过 writev() 函数进行分散写操作,将 m_iv 数组中保存的多个缓冲区的数据写入到套接字中。其中,temp 表示本次写入的字节数量,bytes_have_send 记录已经发送的字节数,bytes_to_send 则表示剩余需要发送的字节数。
writev() 函数是 Linux 系统中的一个系统调用,其作用是将多个缓冲区的数据一起写入到文件描述符中(各个缓冲区的数据可以是不连续的)。它使用了分散写操作,即将要写入的数据分散在多个缓冲区中,通过一次系统调用实现写入操作。
在每次写入操作完成后,都会更新 m_iv 数组及 bytes_have_send、bytes_to_send 变量的值,以便继续下一轮写入操作。
如果在写入过程中出现错误,例如 TCP 写缓冲区满,会返回 false 并关闭连接;
如果写入成功,但还有数据未发送完,会继续进行分散写操作,直到所有数据都发送完毕。
最后,当所有数据都发送完毕时,会判断是否需要保持连接(即 Linger),如果需要则初始化连接并返回 true,否则返回 false 结束连接。
至此,事件处理器的主要流程介绍完毕,当然还有很多细枝末节的点没有说明,当然那个不是本文的重点,也应该交给更专业的书籍去介绍
总结
以上是webserver中I/O处理单元和任务类的主要流程的代码介绍,主要介绍了服务器核心代码编写时的两种模式(如何去高效处理客户端发送过来的事件)
此外,我们重点讨论了在Reactor模式下如何设计一个服务器的I/O处理单元,如何编写用于处理请求事件的任务类(事件处理器)
服务器的基本功能已经完成了,但是这个服务器有很多点需要改进
比如,客户端如果不主动关闭,服务器是不会主动把连接关闭的,即使当前该连接上没有任何的数据传输行为
显然这不太合理,因此下一步我们需要对此进行改进