Reactor 是对 I/O 多路复⽤作了⼀层封装,让使⽤者不⽤考虑底层⽹络 API 的细
节,只需要关注应⽤代码的编写。
Reactor 翻译过来的意思是「反应堆」,这⾥的反应指的是「对事件反应」,也就是来了⼀个事件,Reactor 就有相对应的反应/响应。
事实上,Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复⽤监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。
Reactor 模式主要由 Reactor 和处理资源池这两个核⼼部分组成,它俩负责的事情如下:
Reactor 模式是灵活多变的,可以应对不同的业务场景,灵活在于:
Reactor 的数量可以只有⼀个,也可以有多个。
处理资源池可以是单个进程 / 线程,也可以是多个进程 /线程;
Reactor中具体的操作步骤如下:
读取操作:
(1)Reactor 注册读就绪事件和相关联的回调函数
(2)Reactor 等待事件的发生
(3)当发生读就需事件的时候,Reactor 调用第一步注册的回调函数
(4)Reactor 执行读取操作,将读取到的数据进行业务处理,然后注册写就绪时间
实现简易计算器的业务:客户端发来一段字符串,格式为"a+bXc-dX",X主要用来分隔表达式,防止出现“粘包”问题,计算完成之后,将结果发回去,也用X进行分隔。
Sock.hpp:
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
class Sock{
public:
static int Socket()
{
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
cerr<<"socket error"<<endl;
exit(2);
}
return sock;
}
static void Bind(int sock,int port)
{
struct sockaddr_in local;
bzero(&local,sizeof(local));
local.sin_port=htons(port);
local.sin_family=AF_INET;
local.sin_addr.s_addr=htonl(INADDR_ANY);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
cerr<<"bind error"<<endl;
}
}
static void Listen(int sock)
{
if(listen(sock,5)<0)
{
cerr<<"listen error"<<endl;
exit(4);
}
}
static int Accept(int sock)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int fd=accept(sock,(struct sockaddr*)&peer,&len);
if(fd<0)
{
cerr<<"accept error"<<endl;
}
return fd;
}
//端口复用
static void Setsockopt(int listen_sock)
{
int opt=1;
setsockopt(listen_sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
}
};
Reactor.hpp:
#pragma once
#include
#include
#include
#include
#include
#include
const int SIZE=128;
const int NUM=64;
class Event;
class Reactor;
//一般处理IO时,我们需要三种接口:读取、写入、异常
typedef int (*callback_t)(Event*ev);
//epoll管理的基本节点
class Event
{
public:
//对应的文件描述符
int _sock;
//sock的输入缓冲区
std::string _in_buffer;
//sock的输出缓冲区
std::string _out_buffer;
//设置回调
callback_t _recver;
callback_t _sender;
callback_t _errorer;
//设置Event回指Reactor的指针
Reactor *_p_reactor;
public:
Event()
{
_sock=-1;
_recver=nullptr;
_sender=nullptr;
_errorer=nullptr;
_p_reactor=nullptr;
}
void RegisterCallback(callback_t recver,callback_t sender,callback_t error)
{
_recver=recver;
_sender=sender;
_errorer=error;
}
~Event(){}
};
class Reactor
{
private:
int _epfd;
//文件描述符和Event的映射
std::unordered_map<int,Event*> _events_map;
public:
Reactor()
:_epfd(-1)
{}
void InitReactor()
{
_epfd=epoll_create(SIZE);
if(_epfd<0)
{
std::cerr<<"epoll_create 失败"<<std::endl;
exit(1);
}
std::cout<<"epoll_create 成功"<<std::endl;
}
//插入一个事件
bool InsertEvent(Event*p_ev,uint32_t event)
{
int sock=p_ev->_sock;
//将p_ev中的sock插入到epoll中
struct epoll_event ev;
ev.events=event;
ev.data.fd=sock;
if(epoll_ctl(_epfd,EPOLL_CTL_ADD,sock,&ev)<0)
{
std::cerr<<"epoll_ctl 添加事件失败"<<std::endl;
return false;
}
//将p_ev本身插入到unordered_map中
_events_map[sock]=p_ev;
std::cout<<"添加到epoll中成功:"<<sock<<std::endl;
}
//删除事件
void DeleteEvent(Event*p_ev)
{
int sock=p_ev->_sock;
epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);
_events_map.erase(sock);
}
//修改事件
void ChangeEvent(int sock,bool read,bool write)
{
struct epoll_event ev;
ev.data.fd=sock;
ev.events=(read?EPOLLIN:0)|(write?EPOLLOUT:0)|EPOLLET;
if(epoll_ctl(_epfd,EPOLL_CTL_MOD,sock,&ev)<0)
{
std::cerr<<"修改事件失败"<<std::endl;
}
}
//派发就绪事件
void Dispatcher(int time_out)
{
//获取已经就绪的事件
struct epoll_event revs[NUM];
int num=epoll_wait(_epfd,revs,NUM,time_out);
for(int i=0;i<num;i++)
{
int sock=revs[i].data.fd;
uint32_t events=revs[i].events;
//差错处理
if((events&EPOLLERR)||(events&EPOLLHUP))
{
Event* p_ev=_events_map[sock];
//调用回调
if(p_ev->_recver!=nullptr)
{
p_ev->_errorer(p_ev);
}
std::cout<<"差错处理完成"<<std::endl;
}
//读事件
if(events&EPOLLIN)
{
Event* p_ev=_events_map[sock];
//调用回调
if(p_ev->_recver!=nullptr)
{
p_ev->_recver(p_ev);
}
}
//写事件
if(events&EPOLLOUT)
{
Event* p_ev=_events_map[sock];
if(p_ev->_sender!=nullptr)
{
p_ev->_sender(p_ev);
}
}
}
}
~Reactor()
{
if(_epfd>=0)
{
close(_epfd);
}
}
};
ThreadPool.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include"Reactor.hpp"
#include"Util.hpp"
namespace hjl_thread_pool
{
class Task
{
public:
Event* p_ev;
Task(Event* p):p_ev(p)
{
}
void Run()
{
std::vector<std::string>messages;
hjl_util::StringUtil::SplitString(p_ev->_in_buffer,messages,"X");
//针对一个个报文进行计算,比如一个报文为:1+2
for(auto& str:messages)
{
//进行业务处理,形参响应报文
//业务处理可以使用线程池完成 这样就实现了半同步半异步
int ans=hjl_util::StringUtil::calculate(str);
std::string response;
response+=str;
response+="=";
response+=std::to_string(ans);
//将响应报文放入发送缓冲区
p_ev->_out_buffer+=response;
//响应报文之间添加分隔符
p_ev->_out_buffer+="X";
}
//进行写回
if(!p_ev->_out_buffer.empty())
{
p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,true);
}
}
~Task()
{
}
};
class ThreadPool
{
private:
static ThreadPool* _ptr;
static mutex _mtx;
std::queue<Task*> q;//任务队列
int max_num;//线程的总数
pthread_mutex_t lock;
pthread_cond_t cond;//只能消费者等待,因为生产者要从外部获取任务
void LockQueue(){
pthread_mutex_lock(&lock);
}
void UnlockQueue(){
pthread_mutex_unlock(&lock);
}
bool IsEmpty(){
return q.size()==0;
}
void ThreadWait(){
pthread_cond_wait(&cond,&lock);
}
void ThreadWakeup(){
//唤醒线程
pthread_cond_signal(&cond);
}
void ThreadsWakeup(){
//唤醒所有的线程
pthread_cond_broadcast(&cond);
}
ThreadPool()
{
}
public:
static ThreadPool* GetInstance()//返回指针
{
if (_ptr == nullptr)//双检查,防止多次加锁进行无关消耗
{
//初始化一次
unique_lock<mutex> lock(_mtx);//防止临界资源不安全,进行加锁
if (_ptr == nullptr)
{
_ptr = new ThreadPool;
}
}
return _ptr;
}
//必须设置为静态的,因为成员函数会有this形参
static void*Routine(void*arg)
{
ThreadPool*this_p=(ThreadPool*)arg;
while(true)
{
//从任务队列中拿任务
this_p->LockQueue();
while(this_p->IsEmpty())
{//如果任务队列为空则让线程等待
this_p->ThreadWait();
}
Task* t;
this_p->Get(t);
this_p->UnlockQueue();
t->Run();
delete t;
}
}
void ThreadPoolInit(int _max)
{
max_num=_max;
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t t;//创建一批线程
for(int i=0;i<max_num;i++)
{
pthread_create(&t,nullptr,Routine,this);
}
}
void Put(Task* &in)
{//往任务队列中放任务
LockQueue();
q.push(in);
UnlockQueue();
ThreadWakeup();//放完任务后唤醒一个线程
}
void Get(Task*&out)
{
Task*t=q.front();
q.pop();
out=t;
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
ThreadPool* ThreadPool::_ptr = nullptr;//初始化为空
mutex ThreadPool::_mtx;//初始化锁
}
Util.hpp:
#pragma once
#include
#include
#include
#include
//工具类
namespace hjl_util
{
//将sock设置为非阻塞
void SetNonBlock(int sock)
{
int fl=fcntl(sock,F_GETFL);
if(fl<0)
{
std::cerr<<"获取标志位失败"<<std::endl;
return;
}
fcntl(sock,F_SETFL,fl|O_NONBLOCK);
}
class StringUtil
{
public:
static void SplitString(std::string&in,std::vector<std::string>&out,std::string sep)
{
while(1)
{
int pos=in.find(sep);
if(pos==std::string::npos)
{
return;
}
std::string s=in.substr(0,pos);
out.push_back(s);
in.erase(0,pos+sep.size());
}
}
static int calculate(std::string& s)
{
int result = 0, inter_res = 0, num = 0;
char op = '+';
char ch;
//找到第一个不为空的字符
for (int pos = s.find_first_not_of(' '); pos < s.size(); pos = s.find_first_not_of(' ', pos))
{
ch = s[pos];
//判断是否是数字
if (ch >= '0' && ch <= '9')
{
int num = ch - '0';
//判断下一个字符是否也是数字
while (++pos < s.size() && s[pos] >= '0' && s[pos] <= '9')
num = num * 10 + s[pos] - '0';
switch (op)
{
case '+':
inter_res += num;
break;
case '-':
inter_res -= num;
break;
case '*':
inter_res *= num;
break;
case '/':
inter_res /= num;
break;
}
}
else
{
if (ch == '+' || ch == '-')
{
result += inter_res;
inter_res = 0;
}
op = s[pos++];
}
}
return result + inter_res;
}
};
}
Accepter.hpp:
#pragma once
#include"Reactor.hpp"
#include"Sock.hpp"
#include"Util.hpp"
#include
#include
#include
#include
#include"ThreadPool.hpp"
namespace hjl_service
{
//返回值为0表示读取成功
int _Recver(int sock,std::string&out)
{
while(1)
{
char buffer[128];
int size=recv(sock,buffer,sizeof(buffer)-1,0);
if(size<0)
{
if(errno==EAGAIN||errno==EWOULDBLOCK)
{
//循环读取完毕
return 0;
}
else if(errno==EINTR)
{
//被信号中断了,继续读
continue;
}
else
{
//读取出错
return -1;
}
}
else if(size==0)
{
std::cout<<"用户退出"<<std::endl;
return -1;
}
else
{
buffer[size]=0;
out+=buffer;
}
}
}
int Recver(Event*p_ev)
{
std::cout<<"开始读取数据,文件描述符:"<<p_ev->_sock<<std::endl;
//需要非阻塞读取
if(_Recver(p_ev->_sock,p_ev->_in_buffer)<0)
{
//读取失败了
p_ev->_errorer(p_ev);
return -1;
}
//进行包和包之间的分离,防止出现粘包问题,报文内容格式举例:1+1X1*2
std::cout<<"client#"<<p_ev->_in_buffer<<std::endl;
//读到数据,交给线程池运行
hjl_thread_pool::Task* t=new hjl_thread_pool::Task(p_ev);
hjl_thread_pool::ThreadPool::GetInstance()->Put(t);
return 0;
}
int _Sender(int sock,std::string&in)
{
int total=0;
while(1)
{
//累计一共写入多少字节
int s=send(sock,in.c_str()+total,in.size()-total,0);
if(s>0)
{
total+=s;
//已经将缓冲区写满,不能再写入
if(total==in.size())
{
in.erase(0,total);
return 0;
}
}
else if(s<0)
{
if(errno==EAGAIN||errno==EWOULDBLOCK)
{
//无论是否发送完,都需要把已经发送的数据,移除缓冲区
in.erase(0,total);
return 1;
}
else if(errno==EINTR)
{
//被信号中断
continue;
}
else
{
//写入失败
return -1;
}
}
}
}
int Sender(Event*p_ev)
{
std::cout<<"发送的数据:"<<p_ev->_out_buffer<<std::endl;
int ret=_Sender(p_ev->_sock,p_ev->_out_buffer);
//发回数据
if(ret==0)
{
p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,false);
}
else if(ret==1)
{
//还没写完,打开写接口继续写
p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,true);
}
else
{
//写出错
p_ev->_errorer(p_ev);
}
}
int Errorer(Event*p_ev)
{
close(p_ev->_sock);
p_ev->_p_reactor->DeleteEvent(p_ev);
}
int Accepter(Event*p_ev)
{
std::cout<<"有新的链接到来,listen_sock是:"<<p_ev->_sock<<std::endl;
int listen_sock=p_ev->_sock;
while(1)
{
int sock=Sock::Accept(listen_sock);
if(sock<0)
{
std::cout<<"Accept完成"<<std::endl;
break;
}
std::cout<<"Accept成功:"<<sock<<std::endl;
hjl_util::SetNonBlock(sock);
Event* new_p_ev=new Event();
new_p_ev->_sock=sock;
//回指_p_reactor
new_p_ev->_p_reactor=p_ev->_p_reactor;
//给sock注册读写错误回调方法
new_p_ev->RegisterCallback(Recver,Sender,Errorer);
p_ev->_p_reactor->InsertEvent(new_p_ev,EPOLLIN|EPOLLET);
}
}
}
epoll_server.cpp:
#include"Reactor.hpp"
#include"Sock.hpp"
#include"Accepter.hpp"
#include"ThreadPool.hpp"
void Usage(std::string proc)
{
std::cout<<"Usage:"<<proc<<" port"<<std::endl;
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
exit(1);
}
hjl_thread_pool::ThreadPool::GetInstance()->ThreadPoolInit(1);
//创建socket
int listen_sock=Sock::Socket();
//ET模式下,设置非阻塞
hjl_util::SetNonBlock(listen_sock);
Sock::Bind(listen_sock,(uint16_t)atoi(argv[1]));
Sock::Listen(listen_sock);
Reactor* _p_reactor=new Reactor();
_p_reactor->InitReactor();
//将listen套接字和p_reactor绑定
Event* p_ev=new Event();
p_ev->_sock=listen_sock;
p_ev->_p_reactor=_p_reactor;
p_ev->RegisterCallback(hjl_service::Accepter,nullptr,nullptr);
_p_reactor->InsertEvent(p_ev,EPOLLIN|EPOLLET);
//开始进行事件派发
int time_out=1000;
while(1)
{
_p_reactor->Dispatcher(time_out);
}
}