
仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器:
通过实现的高并发服务器组件,可以简洁快速的完成一个高性能的服务器搭建。并且,通过组件内提供的不同应用层协议支持,也可以快速完成一个高性能应用服务器的搭建(当前为了便于项目的演示,项目中提供HTTP协议组件的支持)
在这里,要明确的是要实现的是一一个高并发服务器组件,因此当前的项目中并不包含实际的业务内容。
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协
议(客户端根据自己的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
协议细节在课堂上已经讲过,这⾥不再赘述。
但是需要注意的是HTTP协议是⼀个运行在TCP协议之上的应用层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应用层基于HTTP协议格式进行数据的组织和解析来明确客⼾端的请求并完成业务处理。
因此实现HTTP服务器简单理解,只需要以下几步即可
当然准确来说,因为我们要实现的服务器本身并不存在业务,咱们要实现的应该算是⼀个高性能服务器基础库,是⼀个基础组件。
Reactor模式,是指通过⼀个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher模式。简单理解就是使⽤ I/O多路复用 统⼀监听事件,收到事件后分发给处理进程或线程,是编写高性能网络服务器的必备技术之⼀。
优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
缺点:无法有效利用CPU多核资源,很容易达到性能瓶颈。
适用场景:适用于客户端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导
致串行处理的情况下,后处理的连接长时间无法得到响应。
咱们要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连
接,保证获取新连接的高效性,提高服务器的并发性能。
主Reactor获取到新连接后分发给子Reactor进行通信事件监控。而子Reactor线程监控各自的描述符的
读写事件进行数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理
的循环。
当前实现中,因为并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现
主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
基于以上的理解,我们要实现的是⼀个带有协议支持的Reactor模型高性能服务器,因此将整个项目的
实现划分为两个大的模块:
• SERVER模块:实现Reactor模型的TCP服务器;
• 协议模块:对当前的Reactor模型服务器提供应用层协议支持;

SERVER模块就是对所有的连接以及线程进行管理,让它们各司其职,在合适的时候做合适的事,最终
完成高性能服务器组件的实现!
而具体的管理也分为三个方面:
• 监听连接管理:对监听连接进行管理。
• 通信连接管理:对通信连接进行管理。
• 超时连接管理:对超时连接进行管理。
基于以上的管理思想,将这个模块进行细致的划分又可以划分为以下多个子模块:
Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能
代码和思路具体在:1.4.C++项目:仿muduo库实现并发服务器之buffer模块的设计
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。
代码和思路具体在:1.5.C++项目:仿muduo库实现并发服务器之socket模块的设计
Channel模块是对⼀个描述符需要进行的IO事件管理的模块,实现对描述符可读,可写,错误…事件的
管理操作,以及Poller模块对描述符进行IO事件监控就绪后,根据不同的事件,回调不同的处理函数功
能。
代码和思路具体在:1.6.C++项目:仿muduo库实现并发服务器之channel模块的设计
对epoll进行的封装,让对描述符进行事件监控的操作更加简单。
代码和思路具体在:1.7.C++项目:仿muduo库实现并发服务器之Poller模块的设计
这个模块和线程是一一对应的!
监听了一个链接,如果这个连接一旦就绪,就要进行事件处理!
但是如果这个描述符,在多个线程中都触发了了事件,进行处理,就会存在线程安全问题!
因此我们需要将一个链接的事件监控, 以及连接事件处理,以及其他操作都放在同一个线程中!
如何保证一个连接的所有操作都在eventloop对应的线程中!
给eventLOOP模块中,都添加一个任务队列!
对连接的所有操作,都进行一次封装,将对连接的操作当作任务都添加到任务队列中!
代码和思路具体在:1.8.C++项目:仿muduo库实现并发服务器之EventLoop模块的设计
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对一个通信套
接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用
Connection进行管理。
• Connection模块内部包含有三个由组件使用者传入的回调函数:连接建立完成回调,事件回调,
新数据回调,关闭回调。
• Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口
• Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
• Connection模块内部包含有⼀个Socket对象:完成描述符面向系统的IO操作
• Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
代码和思路具体在:1.9.C++项目:仿muduo库实现并发服务器之Connection模块的设计
这是一个对于通信连接进行整体管理的一个模块,对一个连接的操作都是通过这个模块来进行!
Acceptor模块是对Socket模块,Channel模块的一个整体封装,实现了对⼀个监听套接字的整体的管理。
代码和思路具体在:1.10.C++项目:仿muduo库实现并发服务器之Acceptor模块的设计



目标:将eventloop模块和线程整合起来!
eventloop 和 线程是一一对应的!
eventloop 模块实例化的对象,在构造的时候就会初始化! _thread_id;
而后面当运行一个操作的时候判断是否运行在eventloop所对应的线程中,就是将线程ID与EventLoop模块中的thread_id 进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是eventloop线程!
eventloop 模块在实例化对象的时候,必须在线程内部!
eventloop 实例化对象会设置自己的 thread_id;
如果我们先创建了多个 eventloop 对象,然后创建了多个线程,将各自的线程id,重新给 eventloop 进行设置!
存在问题:在构造 eventloop对象,到设置新的 thread_id 期间将是不可控的!
因此,必须先创建线程,然后在线程的入口函数中,去实例化 eventloop 对象!
构造一个新的模块:LoopThread
代码和思路具体在:1.11.C++项目:仿muduo库实现并发服务器之LoopThread模块的设计
代码和思路具体在:1.12.C++项目:仿muduo库实现并发服务器之LoopThreadPool模块的设计
目的:TcpServer模块: 对所有模块的整合,通过 tcpserver 模块实例化的对象,可以非常简单的完成一个服务器的搭建。
对前面所有子模块的整合模块,提供给用户用于搭建一个高性能服务器的模块!
1.13.C++项目:仿muduo库实现并发服务器之TcpServer模块的设计

目的:实现一些工具接口
读取文件内容
向文件写入内容
URL编码
URL解码
通过HTTP状态码获取描述信息
通过文件后缀名获取mime
判断一个文件是不是目录
判断一个文件是否是一个普通文件
HTTP资源路径有效性判断
代码和思路具体在:1.14.C++项目:仿muduo库实现并发服务器之Util模块的设计
目的:存储HTTP请求信息
接收到一个数据,按照HTTP请求格式进行解析,得到各个关键要素放到Request中
代码和思路具体在:1.15.C++项目:仿muduo库实现并发服务器之Util模块的设计
目的:存储HTTP响应信息
意义:进行业务处理的同时,让使用者向Response中填充响应要素,完毕后,将其组织成HTTP响应格式的数据,发给客户端。
代码和思路具体在:1.15.C++项目:仿muduo库实现并发服务器之Util模块的设计
目的:要实现渐变的搭建HTTP服务器,所需要提供的要素和功能!
要素:
代码和思路具体在:1.16.C++项目:仿muduo库实现并发服务器之HttpContext以及HttpServer模块的设计
目的:记录HTTP请求的接受和处理进度。
意义:有可能出现接收的数据并不是一条完整的HTTP请求数据,也就是请求的处理需要在多次受到数据之后才能处理完成,因此在每次处理的时候,就需要将进度处理记录下来,以便于下次从当前进度继续向下处理。
代码和思路具体在:1.16.C++项目:仿muduo库实现并发服务器之HttpContext以及HttpServer模块的设计
/*长连接测试1:创建一个客户端持续给服务器发送数据,直到超过超时时间看看是否正常*/
#include "../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DBG_LOG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}
客户端每三秒发送一次数据,刷新活跃度。长连接测试正常。


/*超时连接测试1:创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接*/
#include "../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DBG_LOG("[%s]", buf);
sleep(15);
}
cli_sock.Close();
return 0;
}

/*业务处理超时,查看服务器的处理情况
; 当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)
; 1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放
; 假设现在 12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度
; 1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度
; 2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉
; 这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)
; 因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,
; 等到事件处理完了执行任务池中的任务的时候,再去释放
; */
#include "../source/server.hpp"
int main()
{
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < 10; i++) {
pid_t pid = fork();
if (pid < 0) {
DBG_LOG("FORK ERROR");
return -1;
}else if (pid == 0) {
Socket cli_sock;
cli_sock.CreateClient(8085, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DBG_LOG("[%s]", buf);
}
cli_sock.Close();
exit(0);
}
}
while(1) sleep(1);
return 0;
}

/*一次性给服务器发送多条数据,然后查看服务器的处理结果*/
/*每一条请求都应该得到正常处理*/
#include "../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DBG_LOG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}

/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*/
/*
上传的文件,和服务器保存的文件一致
*/
#include "../source/http/http.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "127.0.0.1");
std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
std::string body;
Util::ReadFile("./hello.txt", &body);
req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
assert(cli_sock.Send(body.c_str(), body.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DBG_LOG("[%s]", buf);
sleep(3);
cli_sock.Close();
return 0;
}


性能压力测试:
并发量:可以同时处理多少客户端的请求而不会出现连接失败
QPS:每秒处理的包的数量
借助:webbench工具
原理:创建大量的进程,在进程中,创建客户端连接服务器,发送请求,收到响应后关闭连接,开始下一个连接的建立!
测试环境:服务器是1核2G带宽1M的云服务器
客户端:
模拟5000个客户端同时向服务器发送请求,没有出现连接失败。

测试结论:
服务器并发量:可以同时处理5000-6000个客户端的请求而不会出现连接失败。
QPS:(Query Per Second)每秒查询率1817左右。
2023.10.17
善始善终
莫问西东