今天来针对上一节课讲的多路转接知识再进一步进行设计,Reactor是基于epoll的ET模式设计的,在现在的高校和企业中是广泛应用的,今天我们来实现一个简洁版,完整版博主可没那个实力~
目录
Reactor负责接收事件并关心对应文件描述符读写事件,当epoll中就绪队列有数据时,把事件派发给连接管理器,Accepter调用对应的读写模块。
- #include "Reactor.hpp"
- #include"Sock.hpp"
- #include
- #include
- #include"Accepter.hpp"
- #include"Util.hpp"
-
- static void Usage(std::string proc)
- {
- std::cout << "Usage: " << "\n\t" << proc << " port" << std::endl;
- }
- int main(int argc, char* argv[])
- {
- if(argc != 2)
- {
- Usage(argv[0]);
- exit(1);
- }
- //1、创建socket, 监听
- int listen_sock = Sock::Socket();
- SetNonBlock(listen_sock);//listen_sock设置为非阻塞
- Sock::Bind(listen_sock, (uint16_t)atoi(argv[1]));
- Sock::Listen(listen_sock);
-
- //2、创建Reactor对象
- //Reactor 反应堆模式: 通过多路转接方案, 被动的采用事件派发的方式, 去反向的调用对应的回调函数
- //1. 检测到事件 -- epoll
- //2. 派发事件 - Dispatcher (事件派发 + IO) + 业务处理: 半同步半异步的处理方式
- //3. 链接 -- accepter
- //4. IO -- recver, sender
- Reactor* R = new Reactor();
- R->InitReactor();
-
- //3、 给Reactor反应堆中加柴火
- //3.1 有柴火
- Event* evp = new Event;
- evp->sock = listen_sock;
- evp->R = R; //指向柴火加的反应堆
-
-
- //Accepter链接管理器
- evp->RegisterCallBack(Accepter, nullptr, nullptr);
-
- //3.2 将准备好的柴火放入反应堆Reactor中
- R->InsertEvent(evp, EPOLLIN | EPOLLET); //设置ET模式
-
- //4. 开始进行事件派发
- for(; ;)
- {
- R->Dispatcher(1000); //事件派发
- }
- return 0;
- }
- #pragma once
-
- #include
- #include"Reactor.hpp"
- #include"Sock.hpp"
- #include"Service.hpp"
- #include"Util.hpp"
-
- int Accepter(Event* evp)
- {
- std::cout << "有新的链接到来了, 就绪的sock是: " << evp->sock << std::endl;
- //经过这样的死循环,会不断的把链接构建成event结构添加到反应堆中
- while(true)
- {
- int sock = Sock::Accept(evp->sock);
- if(sock < 0)
- {
- std::cout << "Accept Done!" << std::endl;
- break;
- }
- std::cout << "Accept success: " << sock << std::endl;
- SetNonBlock(sock);
- //获取链接成功, IO socket
- Event* other_ev = new Event();
- other_ev->sock = sock;
- other_ev->R = evp->R; //为什么要让所有的Event指向自己所属的Reactor??
-
- //recver sender error 就是我们代码中的较顶层, 只负责读取!
- other_ev->RegisterCallBack(Recver, Sender, Error);
-
- //不能同时设置EPOLLOUT和EPOLLIN, 这样服务器会一直死循环,服务器性能大大降低
- evp->R->InsertEvent(other_ev, EPOLLIN|EPOLLET);
- }
- return 0;
- }
- #pragma once
-
- #include
- #include
- #include
- #include
- #include
- #include
-
-
- //一般处理IO的时候, 我们只有三种接口需要处理
- //处理写入
- //处理读取
- //处理异常
-
- class Event; //声明
- class Reactor;
- #define SIZE 128
- #define NUM 64
-
- typedef int (*callback_t)(Event* ev); //函数指针类型
-
- //需要让epoll管理的基本结点
- class Event
- {
- public:
- int sock; //对应的文件描述符
- std::string inbuffer;//对应的sock, 对应的输入缓冲区->粘包问题解决
- std::string outbuffer; //对应的sock, 对应的输入缓冲区->epoll发送出去
-
- //sock设置回调
- callback_t recver;
- callback_t sender;
- callback_t errorer;
-
- //设置Event回指Reactor的指针
- Reactor* R;
- public:
- Event()
- {
- sock = -1;
- recver = nullptr;
- sender = nullptr;
- errorer = nullptr;
- R = nullptr;
- }
- ~Event()
- {}
- //设置回调
- void RegisterCallBack(callback_t _recver, callback_t _sender, callback_t _errorer)
- {
- recver = _recver;
- sender = _sender;
- errorer = _errorer;
- }
- };
-
- //不需要关心任何sock的类型(listen, 读, 写)
- //如何进行使用该类, 对Event进行管理
- class Reactor
- {
- private:
- int epfd;
- std::unordered_map<int, Event*> events; //我的Epoll类管理的所有的Event的集合
- public:
- Reactor()
- :epfd(-1)
- {}
- ~Reactor()
- {}
- void InitReactor()
- {
- epfd = epoll_create(SIZE);
- if(epfd < 0)
- {
- std::cerr << "epoll_create error" << std::endl;
- exit(2);
- }
- std::cout << "InitReactor success" << std::endl;
- }
- //增加
- bool InsertEvent(Event* evp, uint32_t evs)
- {
- //1、将sock中的sock插入到epoll中
- struct epoll_event ev;
- ev.events = evs;
- ev.data.fd = evp->sock;
- if(epoll_ctl(epfd, EPOLL_CTL_ADD, evp->sock, &ev) < 0) //-1失败, 0成功
- {
- std::cerr << "epoll_ctl add event failed" << std::endl;
- return false;
- }
- //2、将ev本身插入到unordered_map中
- events.insert({evp->sock, evp});//evp是维护的一个Event结点
- return true;
- }
- void DeleteEvent(Event* evp)
- {
- int sock = evp->sock;
- auto iter = events.find(sock);
- if(iter != events.end())//找到了
- {
- //1、将sock中的sock从epoll中删除
- epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);//删除不关心events事件,所以设置成nullptr
-
- //2. 把特定的ev 从unordered_map中移除
- events.erase(iter);
-
- //3、close
- close(sock);
-
- //4、删除event结点 -->它是new出来的, 要手动释放
- delete evp;
- }
- }
-
- //关于修改, 也是最后看, 使能读写
- bool EnableRW(int sock, bool enbread, bool enbwrite)
- {
- struct epoll_event ev;
- ev.events = EPOLLET | (enbread ? EPOLLIN : 0) | (enbwrite ? EPOLLOUT : 0);
- ev.data.fd = sock;
-
- if(epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
- {
- std::cerr << "epoll_ctl mod event failed" << std::endl;
- return false;
- }
- return true;
- }
-
- //判断sock是否是合法的
- bool IsSockOK(int sock)
- {
- auto iter = events.find(sock);
- return iter != events.end(); //找到就合法
- }
-
- //就绪事件的派发逻辑
- void Dispatcher(int timeout) //将就绪事件派发给sock
- {
- struct epoll_event revs[NUM];
- int n = epoll_wait(epfd, revs, NUM, timeout);
- for(int i = 0; i < n; i++) //n 表示就绪的fd个数, 维护在了数组里面
- {
- int sock = revs[i].data.fd;
- uint32_t revents = revs[i].events; //本次文件描述符就绪的事件
- //代表差错处理, 将所有的错误问题全部转化成为让IO函数去解决
- if(revents & EPOLLERR)
- {
- revents |= (EPOLLIN | EPOLLOUT); //设置读写
- }
- if(revents & EPOLLHUP) //对端链接关闭
- {
- revents |= (EPOLLIN | EPOLLOUT); //设置读写
- }
- //读数据就绪, 可能有bug,后面解决
- if(revents & EPOLLIN)
- {
- //直接调用回调方法, 执行对应的读取
- if(IsSockOK(sock) && events[sock]->recver) //recever是回调
- {
- events[sock]->recver(events[sock]);
- }
- }
- if(revents & EPOLLOUT)
- {
- //直接调用回调方法, 执行对应的读取
- if(IsSockOK(sock) && events[sock]->sender)
- {
- events[sock]->sender(events[sock]);
- }
- }
- }
- }
- };
- #pragma once
- #include "Reactor.hpp"
- #include
- #include
- #include"Util.hpp"
-
- #define ONCE_SIZE 128
-
- //1: 本轮读取全部完成
- //-1: 读取出错
- //0: 对端关闭链接
- static int RecverCore(int sock, std::string &inbuffer)
- {
- while (true)
- {
- char buffer[ONCE_SIZE];
- ssize_t s = recv(sock, buffer, ONCE_SIZE - 1, 0);
- if (s > 0)
- {
- //读取成功
- buffer[s] = 0;
- inbuffer += buffer;
- }
- else if (s < 0)
- {
- if(errno == EINTR)
- {
- //IO被信号打断, 概率特别低
- continue;
- }
- //errno会被自动设置
- if (errno == EAGAIN || errno == EWOULDBLOCK)
- {
- // 1、读完, 底层没数据了
- return 1; //success
- }
- // 2、真的出错了
- return -1;
- }
- else // s == 0
- {
- return 0;
- }
- }
- }
-
- int Recver(Event *evp)
- {
- std::cout << "Recver been called" << std::endl;
- // 1、真正的读取
- int result = RecverCore(evp->sock, evp->inbuffer);
- if(result <= 0)
- {
- //差错处理
- if(evp->errorer)
- {
- evp->errorer(evp);
- }
- return -1;
- }
- //1+2X2+3X5+6X
- // 2、分包 -- 一个或者多个报文 -- 解决粘包问题
- std::vector
tokens; - std::string sep = "X";
- SplitSegment(evp->inbuffer, &tokens, sep); //从缓冲区读取字节到tokens
-
- // 3、反序列化 -- 针对一个报文 提取有效参与计算或者存储的信息
- for (auto &seg : tokens)
- {
- std::string data1, data2;
- if(Deserialize(seg, &data1, &data2)) //就是和业务强相关了
- {
- // 4、业务逻辑 -- 得到结果
- int x = atoi(data1.c_str());
- int y = atoi(data2.c_str());
- int z = x + y;
- // 5、构建响应 -- 添加到evp->outbuffer!!
- // 1+2X ---> 1+2=3X
- std::string res = data1;
- res += "+";
- res += data2;
- res += "=";
- res += std::to_string(z);
- res += sep;
-
- //send?? 不能直接send!!!
- evp->outbuffer += res; //把数据写入自己维护的缓冲区
- }
- }
-
- // 6、尝试直接/间接进行发送 -- 后续说明
- //必须条件成熟了(写事件就绪), 你才能发送呢??
- //一般只要将报文处理完毕, 才需要发送
- //写事件一般都是就绪的, 但是用户不一定是就绪的!
- //对于写事件, 我们通常是按需设置!!
- if(!(evp->outbuffer).empty())
- {
- //写打开的时候, 默认就是就绪的, 即便是发送缓冲区已经满了
- //epoll 只要用户重新设置了OUT事件, EPOLLOUT至少会触发一次
- evp->R->EnableRW(evp->sock, true, true); //读写使能开启
- }
- return 0;
- }
-
- //1: 数据全部发完
- //0: 数据没有发完, 但是不能再发了
- //-1:发送失败
- int SenderCore(int sock, std::string& outbuffer)
- {
- while(true)
- {
- int total = 0; //本轮累计发送的数据量
- const char* start = outbuffer.c_str();
- int size = outbuffer.size();
- ssize_t curr = send(sock, start + total, size - total, 0);
- if(curr > 0)
- {
- total += curr;
- if(total == size)
- {
- //全部将数据发送完成
- outbuffer.clear();
- return 1;
- }
- }
- else
- {
- //数据没有发送完成, 但是不能在发送了
- if(errno == EINTR) //IO信号中断
- {
- continue;
- }
- if(errno == EAGAIN || errno == EWOULDBLOCK) //发送缓冲区满了
- {
- outbuffer.erase(0, total);
- return 0;
- }
- return -1;
- }
- }
- }
- int Sender(Event *evp)
- {
- std::cout << "Sender been called" << std::endl;
- //1: 数据全部发完
- //0: 数据没有发完, 但是不能再发了
- //-1:发送失败
- int result = SenderCore(evp->sock, evp->outbuffer);
- if(result == 1)
- {
- evp->R->EnableRW(evp->sock, true, false); //按需设置
- }
- else if(result == 0)
- {
- //可以什么也不做
- evp->R->EnableRW(evp->sock, true, false);
- }
- else
- {
- if(evp->errorer)
- {
- evp->errorer(evp); //errorer统一处理差错
- }
- }
- return 0;
- }
-
- int Error(Event *evp)
- {
- std::cout << "Error been called" << std::endl;
- evp->R->DeleteEvent(evp);
- return 0;
- }
- #pragma once
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- using namespace std;
-
- class Sock
- {
- public:
- static int Socket()
- {
- int sock = socket(AF_INET, SOCK_STREAM, 0); // UDP
- if (sock < 0)
- {
- cerr << "socket err" << endl;
- exit(2);
- }
- return sock;
- }
- static void Bind(int sock, uint16_t port)
- {
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(port);
- local.sin_addr.s_addr = INADDR_ANY; //服务端,ip地址
-
- if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
- {
- cerr << "bind error" << endl;
- exit(3);
- }
- }
-
- static void Listen(int sock)
- {
- if (listen(sock, 5) < 0)
- {
- cerr << "listen error" << endl;
- exit(4);
- }
- }
-
- static int Accept(int sock)
- {
- struct sockaddr_in peer;
- socklen_t len = sizeof(peer);
- int fd = accept(sock, (struct sockaddr *)&peer, &len);
- if(fd >= 0)
- {
- return fd;
- }
- return -1;
- }
-
- static void Connect(int sock, string ip, uint16_t port)
- {
- struct sockaddr_in server;
- memset(&server, 0, sizeof(server));
-
- server.sin_family = AF_INET;
- server.sin_port = htons(port);
- server.sin_addr.s_addr = inet_addr(ip.c_str());//字符串->整型,大小端解决了
- //inet_ntoa 整型->字符串
-
- if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0)
- {
- cout << "Connect Success" << endl;
- }
- else
- {
- cout << "Connect Failed" << endl;
- exit(5);
- }
- }
- };
- #pragma once
- #include
- #include
- #include
-
-
- //工具类
- //设置一个sock成为非阻塞
- void SetNonBlock(int sock)
- {
- int f1 = fcntl(sock, F_GETFL);
- if(f1 < 0)
- {
- std::cerr << "fcntl failed" << std::endl;
- return;
- }
- fcntl(sock, F_SETFL, f1|O_NONBLOCK);//设置非阻塞
- }
-
- //1+2X2+3X5+6X
- void SplitSegment(std::string& inbuffer, std::vector
* tokens, std::string sep) - {
- while(true)
- {
- std::cout << "inbuffer: " << inbuffer << std::endl; //查看缓冲区里还有什么
- auto pos = inbuffer.find(sep);
- if(pos == std::string::npos) //没有找到
- {
- break;
- }
- std::string sub = inbuffer.substr(0, pos); //[ )
- tokens->push_back(sub);
- inbuffer.erase(0, pos + sep.size()); //从0开始, 移除pos+sep.size()个
- }
- }
- bool Deserialize(const std::string& seg, std::string* out1, std::string* out2) //就是和业务强相关
- {
- //1+2
- std::string op = "+";
- auto pos = seg.find("+");
- if(pos == std::string::npos) //没找到
- {
- return false;
- }
- *out1 = seg.substr(0, pos);
- *out2 = seg.substr(pos+op.size());
- return true;
- }
- epoll_server:epoll_server.cc
- g++ -o $@ $^ -std=c++11
- .PHONY:clean
- clean:
- rm -f epoll_server
看到这里, 给博主点个赞吧~