• 高性能服务器之Reactor设计


                                       

            今天来针对上一节课讲的多路转接知识再进一步进行设计,Reactor是基于epoll的ET模式设计的,在现在的高校和企业中是广泛应用的,今天我们来实现一个简洁版,完整版博主可没那个实力~

    目录

    基本原理

    代码实现 

    epoll_server.cc

    Accepter.hpp

    Reactor.hpp

    Service.hpp

    Sock.hpp

    Util.hpp

    Makefile

    测试结果


    基本原理

            Reactor负责接收事件并关心对应文件描述符读写事件,当epoll中就绪队列有数据时,把事件派发给连接管理器,Accepter调用对应的读写模块。

    代码实现 

    epoll_server.cc

    1. #include "Reactor.hpp"
    2. #include"Sock.hpp"
    3. #include
    4. #include
    5. #include"Accepter.hpp"
    6. #include"Util.hpp"
    7. static void Usage(std::string proc)
    8. {
    9. std::cout << "Usage: " << "\n\t" << proc << " port" << std::endl;
    10. }
    11. int main(int argc, char* argv[])
    12. {
    13. if(argc != 2)
    14. {
    15. Usage(argv[0]);
    16. exit(1);
    17. }
    18. //1、创建socket, 监听
    19. int listen_sock = Sock::Socket();
    20. SetNonBlock(listen_sock);//listen_sock设置为非阻塞
    21. Sock::Bind(listen_sock, (uint16_t)atoi(argv[1]));
    22. Sock::Listen(listen_sock);
    23. //2、创建Reactor对象
    24. //Reactor 反应堆模式: 通过多路转接方案, 被动的采用事件派发的方式, 去反向的调用对应的回调函数
    25. //1. 检测到事件 -- epoll
    26. //2. 派发事件 - Dispatcher (事件派发 + IO) + 业务处理: 半同步半异步的处理方式
    27. //3. 链接 -- accepter
    28. //4. IO -- recver, sender
    29. Reactor* R = new Reactor();
    30. R->InitReactor();
    31. //3、 给Reactor反应堆中加柴火
    32. //3.1 有柴火
    33. Event* evp = new Event;
    34. evp->sock = listen_sock;
    35. evp->R = R; //指向柴火加的反应堆
    36. //Accepter链接管理器
    37. evp->RegisterCallBack(Accepter, nullptr, nullptr);
    38. //3.2 将准备好的柴火放入反应堆Reactor中
    39. R->InsertEvent(evp, EPOLLIN | EPOLLET); //设置ET模式
    40. //4. 开始进行事件派发
    41. for(; ;)
    42. {
    43. R->Dispatcher(1000); //事件派发
    44. }
    45. return 0;
    46. }

    Accepter.hpp

    1. #pragma once
    2. #include
    3. #include"Reactor.hpp"
    4. #include"Sock.hpp"
    5. #include"Service.hpp"
    6. #include"Util.hpp"
    7. int Accepter(Event* evp)
    8. {
    9. std::cout << "有新的链接到来了, 就绪的sock是: " << evp->sock << std::endl;
    10. //经过这样的死循环,会不断的把链接构建成event结构添加到反应堆中
    11. while(true)
    12. {
    13. int sock = Sock::Accept(evp->sock);
    14. if(sock < 0)
    15. {
    16. std::cout << "Accept Done!" << std::endl;
    17. break;
    18. }
    19. std::cout << "Accept success: " << sock << std::endl;
    20. SetNonBlock(sock);
    21. //获取链接成功, IO socket
    22. Event* other_ev = new Event();
    23. other_ev->sock = sock;
    24. other_ev->R = evp->R; //为什么要让所有的Event指向自己所属的Reactor??
    25. //recver sender error 就是我们代码中的较顶层, 只负责读取!
    26. other_ev->RegisterCallBack(Recver, Sender, Error);
    27. //不能同时设置EPOLLOUT和EPOLLIN, 这样服务器会一直死循环,服务器性能大大降低
    28. evp->R->InsertEvent(other_ev, EPOLLIN|EPOLLET);
    29. }
    30. return 0;
    31. }

    Reactor.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. //一般处理IO的时候, 我们只有三种接口需要处理
    9. //处理写入
    10. //处理读取
    11. //处理异常
    12. class Event; //声明
    13. class Reactor;
    14. #define SIZE 128
    15. #define NUM 64
    16. typedef int (*callback_t)(Event* ev); //函数指针类型
    17. //需要让epoll管理的基本结点
    18. class Event
    19. {
    20. public:
    21. int sock; //对应的文件描述符
    22. std::string inbuffer;//对应的sock, 对应的输入缓冲区->粘包问题解决
    23. std::string outbuffer; //对应的sock, 对应的输入缓冲区->epoll发送出去
    24. //sock设置回调
    25. callback_t recver;
    26. callback_t sender;
    27. callback_t errorer;
    28. //设置Event回指Reactor的指针
    29. Reactor* R;
    30. public:
    31. Event()
    32. {
    33. sock = -1;
    34. recver = nullptr;
    35. sender = nullptr;
    36. errorer = nullptr;
    37. R = nullptr;
    38. }
    39. ~Event()
    40. {}
    41. //设置回调
    42. void RegisterCallBack(callback_t _recver, callback_t _sender, callback_t _errorer)
    43. {
    44. recver = _recver;
    45. sender = _sender;
    46. errorer = _errorer;
    47. }
    48. };
    49. //不需要关心任何sock的类型(listen, 读, 写)
    50. //如何进行使用该类, 对Event进行管理
    51. class Reactor
    52. {
    53. private:
    54. int epfd;
    55. std::unordered_map<int, Event*> events; //我的Epoll类管理的所有的Event的集合
    56. public:
    57. Reactor()
    58. :epfd(-1)
    59. {}
    60. ~Reactor()
    61. {}
    62. void InitReactor()
    63. {
    64. epfd = epoll_create(SIZE);
    65. if(epfd < 0)
    66. {
    67. std::cerr << "epoll_create error" << std::endl;
    68. exit(2);
    69. }
    70. std::cout << "InitReactor success" << std::endl;
    71. }
    72. //增加
    73. bool InsertEvent(Event* evp, uint32_t evs)
    74. {
    75. //1、将sock中的sock插入到epoll中
    76. struct epoll_event ev;
    77. ev.events = evs;
    78. ev.data.fd = evp->sock;
    79. if(epoll_ctl(epfd, EPOLL_CTL_ADD, evp->sock, &ev) < 0) //-1失败, 0成功
    80. {
    81. std::cerr << "epoll_ctl add event failed" << std::endl;
    82. return false;
    83. }
    84. //2、将ev本身插入到unordered_map中
    85. events.insert({evp->sock, evp});//evp是维护的一个Event结点
    86. return true;
    87. }
    88. void DeleteEvent(Event* evp)
    89. {
    90. int sock = evp->sock;
    91. auto iter = events.find(sock);
    92. if(iter != events.end())//找到了
    93. {
    94. //1、将sock中的sock从epoll中删除
    95. epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);//删除不关心events事件,所以设置成nullptr
    96. //2. 把特定的ev 从unordered_map中移除
    97. events.erase(iter);
    98. //3、close
    99. close(sock);
    100. //4、删除event结点 -->它是new出来的, 要手动释放
    101. delete evp;
    102. }
    103. }
    104. //关于修改, 也是最后看, 使能读写
    105. bool EnableRW(int sock, bool enbread, bool enbwrite)
    106. {
    107. struct epoll_event ev;
    108. ev.events = EPOLLET | (enbread ? EPOLLIN : 0) | (enbwrite ? EPOLLOUT : 0);
    109. ev.data.fd = sock;
    110. if(epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
    111. {
    112. std::cerr << "epoll_ctl mod event failed" << std::endl;
    113. return false;
    114. }
    115. return true;
    116. }
    117. //判断sock是否是合法的
    118. bool IsSockOK(int sock)
    119. {
    120. auto iter = events.find(sock);
    121. return iter != events.end(); //找到就合法
    122. }
    123. //就绪事件的派发逻辑
    124. void Dispatcher(int timeout) //将就绪事件派发给sock
    125. {
    126. struct epoll_event revs[NUM];
    127. int n = epoll_wait(epfd, revs, NUM, timeout);
    128. for(int i = 0; i < n; i++) //n 表示就绪的fd个数, 维护在了数组里面
    129. {
    130. int sock = revs[i].data.fd;
    131. uint32_t revents = revs[i].events; //本次文件描述符就绪的事件
    132. //代表差错处理, 将所有的错误问题全部转化成为让IO函数去解决
    133. if(revents & EPOLLERR)
    134. {
    135. revents |= (EPOLLIN | EPOLLOUT); //设置读写
    136. }
    137. if(revents & EPOLLHUP) //对端链接关闭
    138. {
    139. revents |= (EPOLLIN | EPOLLOUT); //设置读写
    140. }
    141. //读数据就绪, 可能有bug,后面解决
    142. if(revents & EPOLLIN)
    143. {
    144. //直接调用回调方法, 执行对应的读取
    145. if(IsSockOK(sock) && events[sock]->recver) //recever是回调
    146. {
    147. events[sock]->recver(events[sock]);
    148. }
    149. }
    150. if(revents & EPOLLOUT)
    151. {
    152. //直接调用回调方法, 执行对应的读取
    153. if(IsSockOK(sock) && events[sock]->sender)
    154. {
    155. events[sock]->sender(events[sock]);
    156. }
    157. }
    158. }
    159. }
    160. };

    Service.hpp

    1. #pragma once
    2. #include "Reactor.hpp"
    3. #include
    4. #include
    5. #include"Util.hpp"
    6. #define ONCE_SIZE 128
    7. //1: 本轮读取全部完成
    8. //-1: 读取出错
    9. //0: 对端关闭链接
    10. static int RecverCore(int sock, std::string &inbuffer)
    11. {
    12. while (true)
    13. {
    14. char buffer[ONCE_SIZE];
    15. ssize_t s = recv(sock, buffer, ONCE_SIZE - 1, 0);
    16. if (s > 0)
    17. {
    18. //读取成功
    19. buffer[s] = 0;
    20. inbuffer += buffer;
    21. }
    22. else if (s < 0)
    23. {
    24. if(errno == EINTR)
    25. {
    26. //IO被信号打断, 概率特别低
    27. continue;
    28. }
    29. //errno会被自动设置
    30. if (errno == EAGAIN || errno == EWOULDBLOCK)
    31. {
    32. // 1、读完, 底层没数据了
    33. return 1; //success
    34. }
    35. // 2、真的出错了
    36. return -1;
    37. }
    38. else // s == 0
    39. {
    40. return 0;
    41. }
    42. }
    43. }
    44. int Recver(Event *evp)
    45. {
    46. std::cout << "Recver been called" << std::endl;
    47. // 1、真正的读取
    48. int result = RecverCore(evp->sock, evp->inbuffer);
    49. if(result <= 0)
    50. {
    51. //差错处理
    52. if(evp->errorer)
    53. {
    54. evp->errorer(evp);
    55. }
    56. return -1;
    57. }
    58. //1+2X2+3X5+6X
    59. // 2、分包 -- 一个或者多个报文 -- 解决粘包问题
    60. std::vector tokens;
    61. std::string sep = "X";
    62. SplitSegment(evp->inbuffer, &tokens, sep); //从缓冲区读取字节到tokens
    63. // 3、反序列化 -- 针对一个报文 提取有效参与计算或者存储的信息
    64. for (auto &seg : tokens)
    65. {
    66. std::string data1, data2;
    67. if(Deserialize(seg, &data1, &data2)) //就是和业务强相关了
    68. {
    69. // 4、业务逻辑 -- 得到结果
    70. int x = atoi(data1.c_str());
    71. int y = atoi(data2.c_str());
    72. int z = x + y;
    73. // 5、构建响应 -- 添加到evp->outbuffer!!
    74. // 1+2X ---> 1+2=3X
    75. std::string res = data1;
    76. res += "+";
    77. res += data2;
    78. res += "=";
    79. res += std::to_string(z);
    80. res += sep;
    81. //send?? 不能直接send!!!
    82. evp->outbuffer += res; //把数据写入自己维护的缓冲区
    83. }
    84. }
    85. // 6、尝试直接/间接进行发送 -- 后续说明
    86. //必须条件成熟了(写事件就绪), 你才能发送呢??
    87. //一般只要将报文处理完毕, 才需要发送
    88. //写事件一般都是就绪的, 但是用户不一定是就绪的!
    89. //对于写事件, 我们通常是按需设置!!
    90. if(!(evp->outbuffer).empty())
    91. {
    92. //写打开的时候, 默认就是就绪的, 即便是发送缓冲区已经满了
    93. //epoll 只要用户重新设置了OUT事件, EPOLLOUT至少会触发一次
    94. evp->R->EnableRW(evp->sock, true, true); //读写使能开启
    95. }
    96. return 0;
    97. }
    98. //1: 数据全部发完
    99. //0: 数据没有发完, 但是不能再发了
    100. //-1:发送失败
    101. int SenderCore(int sock, std::string& outbuffer)
    102. {
    103. while(true)
    104. {
    105. int total = 0; //本轮累计发送的数据量
    106. const char* start = outbuffer.c_str();
    107. int size = outbuffer.size();
    108. ssize_t curr = send(sock, start + total, size - total, 0);
    109. if(curr > 0)
    110. {
    111. total += curr;
    112. if(total == size)
    113. {
    114. //全部将数据发送完成
    115. outbuffer.clear();
    116. return 1;
    117. }
    118. }
    119. else
    120. {
    121. //数据没有发送完成, 但是不能在发送了
    122. if(errno == EINTR) //IO信号中断
    123. {
    124. continue;
    125. }
    126. if(errno == EAGAIN || errno == EWOULDBLOCK) //发送缓冲区满了
    127. {
    128. outbuffer.erase(0, total);
    129. return 0;
    130. }
    131. return -1;
    132. }
    133. }
    134. }
    135. int Sender(Event *evp)
    136. {
    137. std::cout << "Sender been called" << std::endl;
    138. //1: 数据全部发完
    139. //0: 数据没有发完, 但是不能再发了
    140. //-1:发送失败
    141. int result = SenderCore(evp->sock, evp->outbuffer);
    142. if(result == 1)
    143. {
    144. evp->R->EnableRW(evp->sock, true, false); //按需设置
    145. }
    146. else if(result == 0)
    147. {
    148. //可以什么也不做
    149. evp->R->EnableRW(evp->sock, true, false);
    150. }
    151. else
    152. {
    153. if(evp->errorer)
    154. {
    155. evp->errorer(evp); //errorer统一处理差错
    156. }
    157. }
    158. return 0;
    159. }
    160. int Error(Event *evp)
    161. {
    162. std::cout << "Error been called" << std::endl;
    163. evp->R->DeleteEvent(evp);
    164. return 0;
    165. }

    Sock.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. using namespace std;
    10. class Sock
    11. {
    12. public:
    13. static int Socket()
    14. {
    15. int sock = socket(AF_INET, SOCK_STREAM, 0); // UDP
    16. if (sock < 0)
    17. {
    18. cerr << "socket err" << endl;
    19. exit(2);
    20. }
    21. return sock;
    22. }
    23. static void Bind(int sock, uint16_t port)
    24. {
    25. struct sockaddr_in local;
    26. memset(&local, 0, sizeof(local));
    27. local.sin_family = AF_INET;
    28. local.sin_port = htons(port);
    29. local.sin_addr.s_addr = INADDR_ANY; //服务端,ip地址
    30. if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
    31. {
    32. cerr << "bind error" << endl;
    33. exit(3);
    34. }
    35. }
    36. static void Listen(int sock)
    37. {
    38. if (listen(sock, 5) < 0)
    39. {
    40. cerr << "listen error" << endl;
    41. exit(4);
    42. }
    43. }
    44. static int Accept(int sock)
    45. {
    46. struct sockaddr_in peer;
    47. socklen_t len = sizeof(peer);
    48. int fd = accept(sock, (struct sockaddr *)&peer, &len);
    49. if(fd >= 0)
    50. {
    51. return fd;
    52. }
    53. return -1;
    54. }
    55. static void Connect(int sock, string ip, uint16_t port)
    56. {
    57. struct sockaddr_in server;
    58. memset(&server, 0, sizeof(server));
    59. server.sin_family = AF_INET;
    60. server.sin_port = htons(port);
    61. server.sin_addr.s_addr = inet_addr(ip.c_str());//字符串->整型,大小端解决了
    62. //inet_ntoa 整型->字符串
    63. if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0)
    64. {
    65. cout << "Connect Success" << endl;
    66. }
    67. else
    68. {
    69. cout << "Connect Failed" << endl;
    70. exit(5);
    71. }
    72. }
    73. };

    Util.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. //工具类
    6. //设置一个sock成为非阻塞
    7. void SetNonBlock(int sock)
    8. {
    9. int f1 = fcntl(sock, F_GETFL);
    10. if(f1 < 0)
    11. {
    12. std::cerr << "fcntl failed" << std::endl;
    13. return;
    14. }
    15. fcntl(sock, F_SETFL, f1|O_NONBLOCK);//设置非阻塞
    16. }
    17. //1+2X2+3X5+6X
    18. void SplitSegment(std::string& inbuffer, std::vector* tokens, std::string sep)
    19. {
    20. while(true)
    21. {
    22. std::cout << "inbuffer: " << inbuffer << std::endl; //查看缓冲区里还有什么
    23. auto pos = inbuffer.find(sep);
    24. if(pos == std::string::npos) //没有找到
    25. {
    26. break;
    27. }
    28. std::string sub = inbuffer.substr(0, pos); //[ )
    29. tokens->push_back(sub);
    30. inbuffer.erase(0, pos + sep.size()); //从0开始, 移除pos+sep.size()个
    31. }
    32. }
    33. bool Deserialize(const std::string& seg, std::string* out1, std::string* out2) //就是和业务强相关
    34. {
    35. //1+2
    36. std::string op = "+";
    37. auto pos = seg.find("+");
    38. if(pos == std::string::npos) //没找到
    39. {
    40. return false;
    41. }
    42. *out1 = seg.substr(0, pos);
    43. *out2 = seg.substr(pos+op.size());
    44. return true;
    45. }

    Makefile

    1. epoll_server:epoll_server.cc
    2. g++ -o $@ $^ -std=c++11
    3. .PHONY:clean
    4. clean:
    5. rm -f epoll_server

    测试结果

     看到这里, 给博主点个赞吧~

                      

  • 相关阅读:
    分组后合并记录中的字段值
    SpringBoot 整合mybatis,mybatis-plus
    杰哥教你面试之一百问系列:java多线程
    Go 文件操作
    [附源码]java毕业设计学生档案管理系统论文
    理解红黑树
    程序员凭本事赚钱被没收所得,是否体现了“重刑主义”?
    138. 复制带随机指针的链表
    第三章 内存管理 七、具有快表的地址变换结构
    USB2.0 UTMI接口
  • 原文地址:https://blog.csdn.net/qq_58724706/article/details/127682705