Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的设计模式。
Reactor模式的五个角色构成
Reactor模式的工作流程
下面我们根据Reactor的五个角色构成以及其工作流程,实现一个Reactor模式下的epoll服务器,从而更直观地感受一下Reactor模式。
EventItem类的介绍
EventItem类的设计
所以我们可以设计一个EventItem类。,该类中有以下成员:
对于前两种成员文章上面已经介绍过了,下面介绍一下后两种成员的作用。
并且,EventItem类当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。
EventItem类代码如下:
class EventItem
{
public:
int _sock; // 文件描述符
Reactor *_R; // 回指指针
callback_t _recv_handler; // 读回调
callback_t _send_handler; // 写回调
callback_t _error_handler; // 异常回调
std::string _inbuffer; // 输入缓冲区
std::string _outbuffer; // 输出缓冲区
public:
EventItem()
: _sock(-1), _R(nullptr),
_recv_handler(nullptr), _send_handler(nullptr), _error_handler(nullptr)
{}
// 管理回调
void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler)
{
_recv_handler = recv_handler;
_send_handler = send_handler;
_error_handler = error_handler;
}
~EventItem() {}
};
Reactor类的介绍
Reactor类的设计
Reactor类的基本成员变量以及epoll模型创建的代码
#define SIZE 256
#define MAX_NUM 64
class Reactor
{
private:
int _epfd; // epoll模型
std::unordered_map<int, EventItem> _event_items; // 建立sock与EventItem结构的映射
public:
Reactor() : _epfd(-1) {}
void InitReactor()
{
// 创建epoll模型
_epfd = epoll_create(SIZE);
if (_epfd < 0)
{
std::cerr << "epoll_create error" << std::endl;
exit(5);
}
}
~Reactor()
{
if (_epfd >= 0) close(_epfd);
}
};
Reactor类当中的Dispatcher函数就是之前所说的初识分发器,这里我们可以将其更形象地称为事件分派器。
// 事件分派器
void Dispatcher(int timeout)
{
struct epoll_event revs[MAX_NUM];
int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);
for (int i = 0; i < num; ++i)
{
int sock = revs[i].data.fd;
if ((revs[i].events & EPOLLIN) || (revs[i].events & EPOLLHUP))
{
// 优先处理异常事件就绪
if (_event_items[sock]._error_handler)
_event_items[sock]._error_handler(&_event_items[sock]);
}
if (revs[i].events & EPOLLIN)
{
if (_event_items[sock]._recv_handler)
_event_items[sock]._recv_handler(_event_items[sock]);
}
if (revs[i].events & EPOLLOUT)
{
if (_event_items[sock]._send_handler)
_event_items[sock]._send_handler(&_event_items[sock]);
}
}
}
Reactor类当中的AddEvent函数是用于进行事件注册的。
void AddEvent(int sock, uint32_t event, const EventItem &item)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0)
{
std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;
return;
}
// 建立sock与EventItem直接的结构映射关系
_event_items.insert({sock, item});
std::cout << "添加:" << sock << " 到epoll模型中,成功" << std::endl;
}
void DelEvent(int sock)
{
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0)
{
std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;
return;
}
// 取消sock与EventItem之间的映射关系
_event_items.erase(sock);
std::cout << "从epoll模型中删除:" << sock << " 成功" << std::endl;
}
void EnableReadWrite(int sock, bool read, bool write)
{
struct epoll_event ev;
ev.data.fd = sock;
// EPOLLET表示当前epoll服务器为边缘触发模式
ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
{
std::cerr << "epoll mod error, fd: " << sock << std::endl;
}
}
下面介绍并实现四个回调函数
当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数添加到EventItem结构当中。
acceptor回调用于处理连接事件,其工作流程如下:
int acceptor(EventItem *item)
{
while (1)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);
if (sock < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 并没有读取出错,只是底层没有链接了
return 0;
}
else if (errno == EINTR)
{
continue;
}
else
{
std::cerr << " accept error" << std::endl;
return -1;
}
}
// 将该套接字设置为非阻塞
SetNonBlock(sock);
// 构建EventItem结构
EventItem sock_item;
sock_item._sock = sock;
sock_item._R = item->_R;
// 注册回调方法
sock_item.ManageCallbacks(recver, sender, errorer);
Reactor *R = item->_R;
// 将该套接字以及其对应的事件注册到epoll中
R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item);
}
}
需要注意的是,因为这里实现的是ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。
accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。
设置文件描述符为非阻塞
设置文件描述符为非阻塞时,需要先调用fcntl函数获取获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。
代码如下:
// 设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
if (fl < 0)
{
std::cerr << "fcntl error" << std::endl;
return false;
}
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
return true;
}
监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。
accept、recv和send等IO系统调用为什么会被信号中断?
IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但是却在没有返回用户态的时候内核就去处理其他信号了。
写事件是按需打开的
这里调用accept获取上来的套接字添加到epoll模型中时,只添加了EPOLLIN和EPOLLET事件,也就是说只让epoll关心套接字的读事件。
recver回调函数用于处理读事件,其工作流程如下:
下一次Dispatcher在进行事件派发的时候就会帮我们关注该套接字的写事件,当写事件就绪就会执行该套接字对应的EventItem结构中的写回调方法,进而将outbuffer中的响应数据发送给客户端。
int recver(EventItem *item)
{
if (item->_sock < 0) return -1; // 说明该文件描述符已经被关闭
// 1. 数据读取
if (recver_helper(item->_sock, &(item->_inbuffer)) < 0)
{
// 读取失败
item->_error_handler(item);
return -1;
}
// 2. 报文切割,以 X 作为分隔符
std::vector<std::string> datagrams;
StringUtil::Split(item->_inbuffer, &datagrams, "X");
for (auto s : datagrams)
{
// 3. 反序列化
struct data d;
StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
// 4. 业务处理
int result = 0;
switch(d._op)
{
case '+' : result = d._x + d._y; break;
case '-' : result = d._x - d._y; break;
case '*' : result = d._x * d._y; break;
case '/' :
if (d._y == 0)
{
std::cerr << "Error: div zero!" << std::endl;
continue; // 继续处理下一个报文
}
else
{
result = d._x / d._y;
}
break;
case '%' :
if (d._y == 0)
{
std::cerr << "Error: mod zero! " << std::endl;
continue;
}
else
{
result = d._x % d._y;
}
break;
default:
std::cerr << "operation error!" << std::endl;
continue; // 继续处理下一个报文
}
// 5. 形成响应报文
std::string response;
response += std::to_string(d._x);
response += d._op;
response += std::to_string(d._y);
response += "=";
response += std::to_string(result);
response += "X"; // 报文与报文之间的分隔符
// 6. 将响应报文添加到outbuffer中
item->_outbuffer += response;
// 打开写事件
if (!item->_outbuffer.empty()) item->_R->EnableReadWrite(item->_sock, true, true);
}
}
数据读取函数recver_helper
我们可以将循环调用recv函数读取数据的过程封装成一个recv_helper函数。
int recver_helper(int sock, std::string *out)
{
while (true)
{
char buffer[128];
ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (size < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 数据读取完毕
return 0;
}
else if (errno == EINTR)
{
// 被信号中断,继续尝试读取
continue;
}
else
{
// 读取出错
return -1;
}
}
else if (size == 0)
{
// 对端连接关闭
return -1;
}
// 读取成功
buffer[size] = '\0';
// 这里这个out不是输出缓冲区,是输出型参数的意思
*out += buffer; // 将读取到的数据添加到该套接字对应EventItem结构对应的inbuffer中
}
}
报文切割函数Split
报文切割本质就是为了防止粘包问题,而粘包问题实际是设计到协议定制的。
static void Split(std::string &in, std::vector<std::string> *out, const std::string sep)
{
int start = 0;
size_t pos = in.find(sep, start);
while (pos != std::string::npos)
{
out->push_back(in.substr(start, pos - start));
start = pos + sep.size();
pos = in.find(sep, start);
}
in = in.substr(start);
}
反序列化函数Deserialize
在数据发送之前需要进行序列化encode,接收数据之后需要对数据进行反序列化decode。
实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发送的都是需要计算的算术表达式。可以用结构体来描述这样一个算术表达式。
static void Deserialize(std::string &in, int *x, int *y, char *op)
{
size_t pos = 0;
for (pos = 0; pos < in.size(); pos++)
{
if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')
break;
}
if (pos < in.size())
{
std::string left = in.substr(0, pos);
std::string right = in.substr(pos + 1);
*x = atoi(left.c_str());
*y = atoi(right.c_str());
*op = in[pos];
}
else
{
*op = -1;
}
}
业务处理
业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的数据。
我们这里要做的业务处理非常简单,就是用反序列化后的数据计算,此时得到的计算结果就是客户端想要的。
形成响应报文
在业务处理后我们已经拿到了客户端想要的数据,现在我们要的就是形成响应报文,由于我们这里规定每个报文都以"X"作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面加上一个"X",表示这是之前某一个请求报文的响应报文,因此协议定制后就需要双方遵守。
将响应报文添加到outbuffer中
响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。
sender回调函数用于处理写事件,其工作流程如下:
int sender(EventItem *item)
{
if (item->_sock < 0) return -1;
int ret = sender_helper(item->_sock, item->_outbuffer);
if (ret == 0) // 全部发送成功,不再关心写事件
item->_R->EnableReadWrite(item->_sock, true, false);
else if (ret == 1) // 没有发送完毕,还需要继续关心写事件
item->_R->EnableReadWrite(item->_sock, true, true);
else // 写入出错
item->_error_handler(item);
return 0;
}
我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。
// in为输入型参数的意思
int sender_helper(int sock, std::string &in)
{
size_t total = 0; // 累计已经发送的字节数
while (true)
{
ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);
if (size < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 底层缓冲区已经没有空间了
in.erase(0, total); // 将已经发送的数据移除outbuffer
return 1; // 缓冲区写满,没写完
}
else if (errno == EINTR)
{
// 被信号中断
continue;
}
else
{
// 写入出错
return -1;
}
}
total += size;
if (total >= in.size())
{
in.clear(); // 清空outbuffer
return 0; // 全部写入完毕
}
}
}
errorer回调用于处理异常事件
int errorer(EventItem *item)
{
item->_R->DelEvent(item->_sock); // 将该文件描述符从epoll模型删除
close(item->_sock); // 关闭该文件描述符
item->_sock = -1; // 防止关闭后继续执行读写回调
return 0;
}
服务器的运行步骤如下:
将sockt套接字进行封装
这里编写一个Socket类,对套接字相关的接口进行一定程度的封装,并且为了让外部能够直接调用Socket类当中封装的函数,将这些函数定义为了静态成员函数。
class Socket
{
public:
// 创建套接字
static int SocketeCreate()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket error" << std::endl;
exit(2);
}
// 设置端口复用
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
return sock;
}
// 绑定端口号
static void Bind(int sock, int port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_port = htons(port);
local.sin_family = AF_INET;
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if (bind(sock, (struct sockaddr*)&local, len) < 0)
{
std::cerr << "bind error" << std::endl;
exit(3);
}
}
// 监听
static void Listen(int sock, int backlog)
{
if (listen(sock, backlog) < 0)
{
std::cerr << "listen error" << std::endl;
exit(4);
}
}
};
主函数代码
#include "Reactor.hpp"
#include "Util.hpp"
#include "Socket.hpp"
#define BACKLOG 5
static void Usage(const std::string s)
{
std::cout << "Usage: " << s << " port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(1);
}
int port = 8081;
int listen_sock = Socket::SocketeCreate();
Socket::Bind(listen_sock, port);
SetNonBlock(listen_sock);
Socket::Listen(listen_sock, BACKLOG);
// 创建Reactor,并初始化
Reactor R;
R.InitReactor();
// 创建套接字对应的EventItem结构
EventItem item;
item._sock = listen_sock;
item._R = &R;
item.ManageCallbacks(acceptor, nullptr, nullptr); // 套接字只关心读事件
// 将监听套接字托管给Dispatcher
R.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);
// 循环进行事件派发
int timeout = 1000;
while (1) R.Dispatcher(timeout);
return 0;
}
至此,一个简单的单Reactor单线程服务器就编写完毕了。
运行这个服务器:
这就可以看到,服务器可以接收客户端发来的请求并且进行业务处理后响应给客户端。
因为当前的epoll服务器的业务处理比较简单,所以但进程的epoll服务器看起来没有什么压力,但是如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。
解决方法
可以在当前服务器的基础上接入线程池,当recver回调读取完数据并完成报文的切割和反序列化之后,就可以将其构建成一个任务然后放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将事件耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。
接入线程池
线程池的代码如下:
#pragma once
#include
#include
#include
#include
#define NUM 5
//线程池
template<class T>
class ThreadPool
{
public:
//提供一个全局访问点
static ThreadPool* GetInstance()
{
return &_sInst;
}
private:
bool IsEmpty()
{
return _task_queue.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
public:
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
//线程池中线程的执行例程
static void* Routine(void* arg)
{
pthread_detach(pthread_self());
ThreadPool* self = (ThreadPool*)arg;
//不断从任务队列获取任务进行处理
while (true){
self->LockQueue();
while (self->IsEmpty()){
self->Wait();
}
T task;
self->Pop(task);
self->UnLockQueue();
task.Run(); //处理任务
}
}
void ThreadPoolInit()
{
pthread_t tid;
for (int i = 0; i < _thread_num; i++){
pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针
}
}
//往任务队列塞任务(主线程调用)
void Push(const T& task)
{
LockQueue();
_task_queue.push(task);
UnLockQueue();
WakeUp();
}
//从任务队列获取任务(线程池中的线程调用)
void Pop(T& task)
{
task = _task_queue.front();
_task_queue.pop();
}
private:
ThreadPool(int num = NUM) //构造函数私有
: _thread_num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool&) = delete; //防拷贝
std::queue<T> _task_queue; //任务队列
int _thread_num; //线程池中线程的数量
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> _sInst;
};
template<class T>
ThreadPool<T> ThreadPool<T>::_sInst;
设计一个任务类
我们需要定义出来一个任务类,该任务类当中需要提供一个Run方法,这个Run方法就是将线程池中的若干线程池从任务队列当中拿到任务后执行的方法。
此时recver回调函数中在读取数据、报文切割、反序列化后就可以构建出一个任务对象,然后将该任务放到任务队列当中就行了。
int recver(EventItem* item)
{
if (item->_sock < 0) //该文件描述符已经被关闭
return -1;
//1、数据读取
if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
item->_error_handler(item);
return -1;
}
//2、报文切割
std::vector<std::string> datagrams;
StringUtil::Split(item->_inbuffer, &datagrams, "X");
for (auto s : datagrams){
//3、反序列化
struct data d;
StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
Task t(d, item); //构建任务
ThreadPool<Task>::GetInstance()->Push(t); //将任务push到线程池的任务队列中
}
return 0;
}
这样线程池就是接入完毕了,下面再次尝试运行这个服务器。需要注意的是在运行之前需要对线程池进行初始化。
//初始化线程池
ThreadPool<Task>::GetInstance()->ThreadPoolInit();
运行结果如下: