Reactor反应堆模式,也就做分发者模式也叫做通知者模式。它是一种设计模式将就绪事件派发给对应服务器处理程序:其基本理念如下
在Reactor模式当中有几个关键的参与者,下面我们来看看他们分别是什么?
下面我们来看一下Reactor模式下的Epoll服务器(ET模式).一起来感受Reactor模式设计吧。下面我们一起来看一下思路:
在Epoll ET服务器当中我们需要处理一下事件:
当Epoll服务器检测到某一个事件就绪时,会将给事件交给对应的服务程序进行处理。这也是Epoll服务器当中的五个角色:
1.任务分配器:Reactor类当中的Dispatcher函数
2.具体事件处理器:读回调,写回调、异常回调的具体实现
3.句柄:文件描述符
4.同步事件分离器:IO多路复用此次指的是epoll
5.事件处理器:主要指的是读回调、写回调、异常回调
Reactor类当中的Dispatcher函数做的就是调用epoll_wait函数等待事件就绪当有事件就绪时调用对应事件的回调函数。将事件的处理交给对应的服务器处理程序即可。下面让我们来看看这个设计当中非常牛逼的地方
在Reactor模式下Epoll服务器设计当中我个人觉得非常好的地方就是EventItem类的设计。一个Eventem对于一个文件描述符fd,并将Eventem类当中含有对于事件就绪的回调方法,并且Eventem类当中还有指向Reactor类的指针。所有的文件描述符都指向同一个Reactor,当我们服务器处理程序将数据处理完毕之后,可以通过这个指针找到Reactor类并添加写事件。
类中还包含了一个输入和输出缓冲区,至于为什么需要这个输入和输出缓冲区具体请看如下解释:
在这里说一下Eventem类当中的回指指针R就是指向Reactor对象的指针,有了这个指针我们可以快速找到到Reactor对象。当连接事件到来时我们需要通过Eventem对象当中的指针R调用类当中AddEvent方法将其添加到任务分配器当中
下面给出Eventem类的定义:
typedef int (*callback_t)(EventItem *ev); //函数指针类型用于
// 需要让epoll管理的基本节点
class EventItem
{
public:
//对应的文件描述符
int _sock;
//对应的sock,对应的输入缓冲区
std::string inbuffer;
//对应的sock,对应的输出缓冲区
std::string outbuffer;
// sock设置回调
callback_t _recver; //读回调
callback_t _sender; //写回调
callback_t _errorer; //异常回调
// 试着Event回指Reactor的回指指针
Reactor *R;
public:
EventItem()
{
_sock = -1;
_recver = nullptr;
_sender = nullptr;
_errorer = nullptr;
R = nullptr;
}
//注册回调
void RegisterCallback(callback_t _recver, callback_t _sender, callback_t _errorer)
{
_recver = _recver;
_sender = _sender;
_errorer = _errorer;
}
~EventItem()
{
}
};
下面我们来谈一下Reactor类的实现:
在Reactor类当中有一个_events.他是一个哈西表建立了文件描述符和EventItem的映射关系通过文件描述符我们能过找到对应的事件结构体,通过事件结构体我们就能找到对应的回调函数也能找到对应的Reactor类。Reactor当中还有一个成员变量_epfd用来创建epoll模型。
对应代码如下:
class Reactor
{
private:
int _epfd;
std::unordered_map<int, EventItem *> _events; //我的Epoll类管理的所有的Event的集合
//一个文件描述符对应一个EventItem用来建立映射关系
public:
Reactor() : _epfd(-1)
{
}
void InitReactor()
{
_epfd = epoll_create(SIZE);
if (_epfd < 0)
{
std::cerr << "epoll_create error" << std::endl;
exit(2);
}
std::cout << "InitReactor success" << std::endl;
}
bool InsertEvent(EventItem *evp, uint32_t evs)
{
// 1. 将sock中的sock插入到epoll中
struct epoll_event ev;
ev.events = evs;
ev.data.fd = evp->_sock;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, evp->_sock, &ev) < 0)
{
std::cerr << "epoll_ctl add event failed" << std::endl;
return false;
}
// 2. 将ev本身插入到unordered_map中
_events.insert({evp->_sock, evp});
}
void DeleteEvent(EventItem *evp)
{
int sock = evp->_sock;
auto iter = _events.find(sock);
if (iter != _events.end())
{
// 1. 将sock中的sock从epoll中删除它
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
// 2. 将特定的ev从 unordered_map中 移除
_events.erase(iter);
// 3. close
close(sock);
// 4. 删除event节点
delete evp;
}
}
//使能读写
bool EnableRW(int sock, bool enbread, bool enbwrite)
{
struct epoll_event ev;
ev.events = EPOLLET | (enbread ? EPOLLIN : 0) | (enbwrite ? EPOLLOUT : 0);
ev.data.fd = sock;
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
{
std::cerr << "epoll_ctl mod event failed" << std::endl;
return false;
}
}
//判断是否是合法的文件描述符
bool IsSockOk(int sock)
{
auto iter = _events.find(sock);
return iter != _events.end();
}
//就绪事件的派发器逻辑
void Dispatcher(int timeout)
{
struct epoll_event revs[NUM];
int n = epoll_wait(_epfd, revs, NUM, timeout);
for (int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
uint32_t revents = revs[i].events;
//代表差错处理, 将所有的错误问题全部转化成为让IO函数去解决
if (revents & EPOLLERR)
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLHUP)
revents |= (EPOLLIN | EPOLLOUT);
//读数据就绪,可能会有bug,后面解决
if (revents & EPOLLIN)
{
//直接调用回调方法,执行对应的读取
if (IsSockOk(sock) && _events[sock]->_recver)
_events[sock]->_recver(_events[sock]);
}
if (revents & EPOLLOUT)
{
if (IsSockOk(sock) && _events[sock]->_sender)
_events[sock]->_sender(_events[sock]);
}
}
}
~Reactor() {}
};
1.Dispatcher函数的说明
在这里对Dispatcher函数说明一下也就是事件分派器:事件分派器主要是调用epoll_wait等待事件就绪。当某个文件描述上的事件就绪时我们可以通过Reactor类当中的_events通过哈西的方式找到文件描述符对应的事件结构体找到对应的回调方法并调用。
在这里需要说明的一下是此处没有对epoll_wait的返回值进行判断,本质上是通过for的循环判断条件进行判断了如果epoll_wait的返回值为-1不会进入到循环当中,如果返回值为0(超时)也不会进行到循环当中。只有当真正有事件就绪时此时才会进入到循环当中
2.DeleteEvent函数说明:
Reactor类当中的DeleteEvent函数是用于进行事件删除。删除一个事件时我们只需要将对应的文件描述符传入.调用epoll_ctl函数在epoll模型当中删除。并将将哈西表中的映射关系也给删除掉
3.AddEvent函数说明:
Reactor类当中的AddEvent函数是用来往epoll模型当中添加需要关心的事件。调用epoll_ctl函数往epoll模型当中添加需要内核关心对应文件描述符上的那些事件。并建立文件描述符和事件结构体的映射关系
4.EnableRW函数说明:
类中的EnableRW函数用于使能某个文件描述符的读写事件。调用该函数时传入对应的文件描述符。
另外还有两个bool类型的变量表示是使能读还是使能写还是读写。该函数内部调用了epoll_ctl函数修改该文件描述符的监听事件
之前我们一直提到了回调函数下面我们来看看这些回掉函数
当我们给某个文件描述符创建EventItem时,需要调用EventItem类里面的注册回调函数。在这里需要注意的是:
1.我们将监听套接字对应的EventItem结构体当中的_recver设置为accpter这是因为监听套接字对应的读事件就绪了代表的是连接就绪了。这也就意味着监听套接字只关心读事件,那么也就意味着写回调和异常回调我们可以置为空不用关心。
2.Dispatcher一旦检测到某个文件描述符的某个事件就绪了会调用EvetItem当中对应的回调函数。
3.对于客户端建立连接的套接字我们会将其对应的EvetItem中的_recver、_sender、_errorer回调函数都会设置好
下面我们一起来看看这几个回调函数首先我们来看看accepter回调
1.accepter回调
上面提到过accepter回调主要是用来处理连接事件。大致的流程如下:
当我们注册之后,OS在底层就会给我们关心该套接字对应的事件,当事件就绪时执行我们之前设置好的回调方法。
代码如下:
#pragma once
#include "Reator.hpp"
#include "sock.hpp"
#include "Service.hpp"
#include "Util.hpp"
int Accepter(EventItem *evp)
{
std::cout << "有新的链接到来了,就绪的sock是: " << evp->_sock << std::endl;
while(true)
{
int sock = Sock::Accept(evp->_sock);
if(sock < 0)
{
std::cout << "Accept Done!" << std::endl;
break;
}
std::cout << "Accept success: " << sock << std::endl;
SetNonBlock(sock);
//这里呢?获取链接成功,IO socket
EventItem *other_ev = new EventItem();
other_ev->_sock = sock;
other_ev->R = evp->R; //为什么要让所有的Event指向自己所属的Reactor??
//recver, sender, errorer,就是我们代码中的较顶层,只负责真正的读取!
other_ev->RegisterCallback(Recver, Sender, Errorer);
evp->R->InsertEvent(other_ev, EPOLLIN|EPOLLET);
}
}
可能有老铁注意到在这里获取连接是循环一次就给获取完毕。这是因为Reactor模式是基于epoll的边缘触发。事件就绪只会通知上层一次,因此我们在获取连接时需要一次性获取完毕并且需要将监听套接字设置为非阻塞。
如果我们不将监听套接字设置为非阻塞当我们循环调用accept函数获取连接当连接获取完毕之后,我们在调用accept函数时此时底层没有连接就绪accept函数被阻塞住。同样的accept获取的新的套接字也需要将其设置为非阻塞防止后面调用read,write时因为事件不就绪而被阻塞住。
下面让我们来看看如何将文件描述符设置为非阻塞,其实非常的简单使用fcntl函数即可。如果不太清楚的老铁可以看看我之前的高级IO博客在这里将链接给出:
在这里直接给出这个代码:
//设置一个文件描述符为非阻塞
void SetNonBlock(int sock)
{
int f1 = fcntl(sock, F_GETFL);
//获取对应文件描述符的标记
if (f1 < 0)
{
std::cerr << "fcntl faile" << std::endl;
return;
}
//设置文件描述符为非阻塞
fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
//设置文件描述符为非阻塞
}
2.recver回调
recver回调用来读取客户端发送过来的数据,大致流程如下
对应代码如下:
// 1代表本轮读取成功,0代表对端关闭,-1代表失败
static int RecverCore(int sock, std::string &inbuffer) //输入输出型参数
{
while (true)
{
char buffer[ONCE_SIZE];
ssize_t s = recv(sock, buffer, sizeof(ONCE_SIZE) - 1, 0);
if (s > 0)
{
buffer[s] = '\0';
inbuffer += buffer;
}
else if (s < 0)
{
//底层没数据了,或者真的出错了
if (errno == ERANGE || errno == EWOULDBLOCK)
{
//读取完了成功读完
return 1;
}
else if (errno == EINTR)
{
// IO被信号打断
continue;
}
return -1; //真正的出错了
}
else //===0
{
//对端关闭链接
return 0;
}
}
}
int Recver(EventItem *ep)
{
std::cout<<"Recver been called"<<std::endl;
//通过EventItem就可以拿到事件的所有内容
//开始真正的读取,分包我们只想读取一个或者多个报文
//解决粘包问题,反序列化。针对一个报文提取有效参与计算存储的信息
//业务逻辑----->得到结果。构建响应 ,尝试直接间接进行发送----条件成熟了才能发送写事件一般都是就绪的但是就是用户不一定就就绪的
//对于写事件我们通常是按需设置不能EPOLLIN和EPOLLOUT一起设置否则写事件一直就绪被派发而用户的数据没有就绪此时会导致服务器基本不会等待导致服务器
// 压力过大
int ret = RecverCore(ep->_sock, ep->inbuffer);
if (ret <= 0)
{
if (ep->_errorer)
{
ep->_errorer(ep);
}
return -1;
}
std::vector<std::string> tokens; //一个报文放到容器里面
std::string sep = "X";
SplitSegment(ep->inbuffer, &tokens, sep);
for (auto &seg : tokens)
{
std::string data1, data2;
std::string op;
if (Deserialize(seg, &data1, &data2,&op)) //反序列化和业务强相关
{
int x=std::stoi(data1);
int y=std::stoi(data2);
char Op=op[0];
Task t(x,y,Op,ep);
ThreadPool<Task>::GetInstance()->Push(t);
//发送缓存区DFZGVzdccsd
}
}
//将所有的响应添加到outbuffer当中
return 1;
}
在这里需要解释一下的是:
我们使用一个辅助函数RecverCore来帮我们将数据读取到inbuffer当中,在这里需要注意的是当recv函数的返回值为负数时此时并不一定是读取出错有可能是底层数据不就绪但是此时错误码会被设置为
EAGIN或者EWOULDBLOCK此时说明底层事件已经读取完毕,如果此时错误码为EINTR说明读取的过程当中被信号打断此时我们继续进行读取即可
在上述过程当中涉及到了报文分隔其实就是为了防止粘包问题,粘包问题涉及到了协议的定制在这里为了简单起见我们就规定报文和报文之间以’X’进行分隔。所以我们可以将报文分隔好的数据放入到vector当中,而无法分割成一个完整报文的数据就留在缓冲区当中即可。
下面让我们来看看分割的代码:
void SplitSegment(std::string &inbuffer, std::vector<std::string> *tokens, std::string sep)
{
//分隔报文
while (true)
{
int pos = inbuffer.find(sep);
if (pos == -1)
{
break;
}
std::string sub = inbuffer.substr(0, pos);
tokens->push_back(std::move(sub));
inbuffer.erase(0, pos + sep.size()); //将分割符也删掉
}
}
其中还涉及到了反序列化,反序列化也根定制的协议强相关。在这里我们的Epoll服务器提供的是计算服务当然也可以定制其他的服务在这里简单起见。也就是发过来的数据类似于"a+bX"这种这样我们分割起来就非常的简单下面我们来看看是如何进行分割的。
bool Deserialize(std::string &seg,std::string *x,std::string *y,std::string*op)
{
//进行反序列化
std::cout<<seg<<std::endl;
int pos=0;
int N=seg.size();
while(pos<N)
{
if(seg[pos]=='*'||seg[pos]=='/'||seg[pos]=='+'||seg[pos]=='-'||seg[pos]=='%'){
break;
}
pos++;
}
if(pos<N)
{
std::string left=seg.substr(0,pos);
std::string right=seg.substr(pos+1);
*x=left;
*y=right;
*op=std::string(1,seg[pos]);
return true;
}
return false;
}
处理完毕之后我们可以将处理完的结果写入到该文件描述符对应的outbuffer当中并打开该套接字对应的写事件,当事件就绪后就会调用写回调将数据发送给客户端
3.sender回调
sender回调主要用来处理写事件,大致流程如下:
代码如下:
int SendCore(int sock, std::string &outbuffer)
{
while (true)
{
int toatl = 0; //本轮累计发送的数据量
const char *start = outbuffer.c_str();
int size = outbuffer.size();
ssize_t cur = send(sock, start + toatl, size - toatl, 0);
if (cur > 0)
{
toatl += cur;
if (toatl == size)
{
outbuffer.clear();
//全部发送完成
break;
}
}
else
{
//有可能没有发完但是不能发完
if (errno == EINTR)
{
continue;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
outbuffer.erase(0, toatl);
return 0;
}
else
{
//发送失败
return -1;
}
}
}4
return 1;
}
int Sender(EventItem *ep)
{
std::cout<<"Sender been called"<<std::endl;
int ret = SendCore(ep->_sock, ep->outbuffer);
if (ret == 1)
{
ep->R->EnableRW(ep->_sock, true, false);
}
else if (ret == 0)
{
//可以什么都不做,在这里我们再次打开
ep->R->EnableRW(ep->_sock, true, true);
}
else
{
//出错了之间调用该文件描述符对应的异常回调
ep->_errorer(ep);
}
return 0;
}
3.errorer回调
errorer回调主要是用来处理异常事件:
1.对应异常事件在这里我们不做过多的处理,我们只是简单将对应的文件描述符给关闭掉
2.在调用recver和sender回调之前我们需要判断一下Evetem当中的文件描述符是否有效,如果是无效的根本就没必要进行下一步处理。
对应代码:
int Errorer(EventItem *ep)
{
std::cout<<"Error been called"<<std::endl;
ep->R->DeleteEvent(ep);
}
#pragma once
#include
#include
#include
#include
#include
#include
#include
class Sock
{
public:
static int Socket()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);//创建套接字
int opt=1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//端口复用
if (sock < 0)
{
std::cerr << "socket error" << std::endl;
exit(1);
}
return sock;
}
static bool Bind(int sock,unsigned short port)
{
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_addr.s_addr=INADDR_ANY;
local.sin_family=AF_INET;
local.sin_port=htons(port);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
std::cerr<<"bind error"<<std::endl;
exit(2);
}
return true;
}
static bool Listen(int sock,int backlog)
{
if(listen(sock,backlog)<0)
{
std::cerr<<"listen error"<<std::endl;
}
return true;
}
static int Accept(int sock)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int fd=accept(sock,(struct sockaddr*)&peer,&len);
return fd>=0?fd:-1;
}
};
在这里我们封装一个Socket类对一些接口进行封装让外部能直接调用Socket类当中的函数
我们当前的Epoll服务器比较简单所以了单进程的epoll服务器看上去并没有什么压力,但是如果我们的业务比较复杂这时候处理业务的时间就可能比较长。此时epoll服务器无法接受其他新的连接,这样就会使得服务器的效率降低。因此我们可以在服务器当中引入线程池,当recver回调将数据反序列化之后将反序列化得到的数据封装成一个任务将其放入到线程池当中。线程池博主之前有博客已经介绍过了再这里就不再介绍
下面直接给出代码:
ThreadPool.hpp
#pragma once
#include "Task.hpp"
#include
#include
#include
static const int ThreadNUM =5;
template <class T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mtx);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
bool IsEmpty()
{
return _task_queue.size() == 0;
}
public:
static void *Routine(void *arg) //为什么必须是static方法在博客中细说
{
auto *self = (ThreadPool<T> *)arg;
pthread_detach(pthread_self()); //分离线程
while (true)
{
self->LockQueue();
while (self->IsEmpty()) //看此时的任务队列当中是否有任务
{
self->Wait();
}
T task;
self->pop(&task);
self->UnlockQueue();
task.Run(); //处理任务
}
}
void Push(const T &val)
{
LockQueue();
_task_queue.push(val);
UnlockQueue();
WakeUp(); //唤醒在条件变量下等待的一个线程
}
void pop(T *out)
{
*out = _task_queue.front();
_task_queue.pop();
}
void InitThreadPool()
{
//初始化线程池并创建线程
pthread_t tid;
for (int i = 0; i < _num; i++)
{
pthread_create(&tid, NULL, Routine, this);
}
}
//销毁信号量
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
//获取单例
static ThreadPool<T> *GetInstance()
{
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
if (NULL == tp)
{
pthread_mutex_lock(&mtx); //多个线程可能同时进来
if (NULL == tp)
{
tp = new ThreadPool<T>();
}
pthread_mutex_unlock(&mtx);
}
return tp;
}
private:
ThreadPool(int num = ThreadNUM) : _num(num)
{
pthread_mutex_init(&_mtx, NULL);
pthread_cond_init(&_cond, NULL);
}
//禁掉拷贝构造和赋值
ThreadPool(const ThreadPool<T> &tp) = delete;
ThreadPool &operator=(const ThreadPool<T> &tp) = delete;
static ThreadPool<T> *tp;
pthread_mutex_t _mtx;
pthread_cond_t _cond;
int _num; //线程的数量
std::queue<T> _task_queue; //任务队列
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = NULL;
对应Task.hpp
#pragma once
#include
#include"Reator.hpp"
//任务类
class Task
{
public:
Task(int x = 0, int y = 0, char op = 0,EventItem*e=nullptr)
: _x(x), _y(y), _op(op),ep(e)
{}
~Task()
{}
//处理任务的方法
void Run()
{
int result = 0;
switch (_op)
{
case '+':
result = _x + _y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if (_y == 0){
std::cerr << "Error: div zero!" << std::endl;
return;
}
else{
result = _x / _y;
}
break;
case '%':
if (_y == 0){
std::cerr << "Error: mod zero!" << std::endl;
return;
}
else{
result = _x % _y;
}
break;
default:
std::cerr << "operation error!" << std::endl;
return;
}
std::cout << "thread[" << pthread_self() << "]:" << _x << _op << _y << "=" << result << std::endl;
std::string response;
response+=std::to_string(_x);
response+=_op;
response+=std::to_string(_y);
response+=" = ";
response+=std::to_string(result);
response+="X";//分隔符
ep->outbuffer+=response;
//开启使能读写
if(!ep->outbuffer.empty()){
ep->R->EnableRW(ep->_sock,true,true);
}
}
private:
int _x;
int _y;
char _op;
EventItem*ep;
};
线程池接入完毕之后我们来测试一下我们的服务器:
我们再看看线程的个数
对应代码地址