• 【网络编程】基于epoll的ET模式下的Reactor



    需要云服务器等云产品来学习Linux的同学可以移步/-->腾讯云<--/-->阿里云<--/-->华为云<--/官网,轻量型云服务器低至112元/年,新用户首次下单享超低折扣。


     目录

    一、Reactor介绍

    二、基于epoll的ET模式下的Reactor计算器代码

    1、TcpServer.hpp

    2、Epoll.hpp

    3、Main.cc

    4、protocol.hpp

    5、calServer.hpp


    一、Reactor介绍

    reactor模式是一种半同步(负责就绪事件的通知+IO)半异步(业务处理)IO,在Linux网络中,是使用最频繁的一种网络IO的设计模式。(还有一种比较少见的Proactor前摄器模式)Reactor模式中文译为反应堆模式,代码效果类似打地鼠游戏,玩家监控地鼠洞,哪个地鼠洞的“事件”就绪了,就去执行对应的回调方法。

    注意,listen套接字也是非阻塞的,我们无法保证一次读取完毕所有的新连接,所以需要程序员使用while循环监听,读取新连接。

    只要套接字被设置成非阻塞,即可不经过epoll直接发送(大不了发送失败用errno判断一下),但是我们无法保证数据是否一次被发完,所以必须保证一个socket一个发送缓冲区,否则残留的数据会被其他socket覆盖。

    在处理发送事件时,其实非常不建议直接发送,因为程序员是无法保证写事件是就绪的,只有epoll有知晓写缓冲区是否就绪的能力。什么叫写事件就绪?就是发送缓冲区有空间,epoll就会提示写事件就绪。在大部分情况下,乃至服务器刚启动时,写事件其实都是就绪的。所以在epoll中,我们对读事件要常设关心,对写事件则按需设置(写事件常设时调用epoll_wait极大概率就绪)。

    二、基于epoll的ET模式下的Reactor计算器代码

    1、TcpServer.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include "Err.hpp"
    8. #include "Log.hpp"
    9. #include "Sock.hpp"
    10. #include "Epoll.hpp"
    11. #include "Util.hpp"
    12. #include "protocol.hpp"
    13. namespace tcp_server
    14. {
    15. class Connection;
    16. class TcpServer;
    17. static const uint16_t defaultPort = 8080;
    18. static const int num = 64;//表示最多可以存储多少个就绪事件
    19. static const int timeout = 1000;//超时时间
    20. using func_t = std::function<void (Connection*)>;//三种回调方法,读就绪,写就绪,异常就绪
    21. //using hander_t = std::function;
    22. class Connection//每一个套接字都要有自己的缓冲区(把每一个套接字看成Connection对象)
    23. {
    24. public:
    25. Connection(int sock, TcpServer* pTS)
    26. :_sock(sock)
    27. ,_pTS(pTS)
    28. {}
    29. ~Connection(){}
    30. public:
    31. void Register(func_t readFunc, func_t writeFunc, func_t errFunc)//注册事件
    32. {
    33. _recver = readFunc;
    34. _sender = writeFunc;
    35. _excepter = errFunc;
    36. }
    37. void Close()
    38. {
    39. close(_sock);
    40. }
    41. public:
    42. int _sock;
    43. std::string _inBuffer;//输入缓冲区。注意图片和视频的传输格式,每个对象一个缓冲区,考科一防止数据读一半的情况
    44. std::string _outBuffer;//输出缓冲区
    45. func_t _recver;//从sock中读
    46. func_t _sender;//向sock中写
    47. func_t _excepter;//处理sock在io时的异常事件
    48. TcpServer* _pTS;//tcpServer的指针,用于外部调用Connection对象可以控制TcpServer中的EnableReadWrite()接口
    49. uint64_t lastTime;//最近一次访问时间,每一次读和写都更新一下时间
    50. };
    51. class TcpServer//Reactor
    52. {
    53. public:
    54. TcpServer(func_t func, uint16_t port = defaultPort)
    55. :_service(func)
    56. ,_port(port)
    57. ,_revs(nullptr)
    58. {}
    59. ~TcpServer()
    60. {
    61. _sock.Close();
    62. _epoll.Close();
    63. if(nullptr != _revs) delete[] _revs;//还有unordered_map没有析构
    64. }
    65. public:
    66. void InitServer()
    67. {
    68. //1、创建socket
    69. _sock.Socket();
    70. _sock.Bind(_port);
    71. _sock.Listen();
    72. //构建epoll对象
    73. _epoll.Create();
    74. //将listen套接字添加到epoll模型中
    75. AddConnnection(_sock.GetListenSocket(), EPOLLIN | EPOLLET,
    76. std::bind(&TcpServer::Accept, this, std::placeholders::_1), nullptr, nullptr);
    77. _revs = new struct epoll_event[num];
    78. _num = num;
    79. }
    80. void EnableReadWrite(Connection* conn, bool readAble, bool writeAble)//使能读、写
    81. {
    82. uint32_t event = (readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET;
    83. _epoll.Control(conn->_sock, event, EPOLL_CTL_MOD);
    84. }
    85. void Dispatch()//事件派发
    86. {
    87. while(1)
    88. {
    89. Loop(timeout);
    90. //所有事情做完后,遍历所有的连接,计算每一个连接已经多久没发消息了,现在时间和lastTime相减,超过5分钟就关闭连接
    91. }
    92. }
    93. private:
    94. void Accept(Connection* conn)//监听事件的回调函数
    95. {
    96. //获取新连接,监听套接字也是非阻塞的。
    97. //Accept在非阻塞模式,返回值为-1时,判断errno即可知道是否读到所有的新连接
    98. while(1)
    99. {
    100. std::string clientIp;
    101. uint16_t clientPort;
    102. int err = 0;//用于提取Accept的返回值
    103. int sock = _sock.Accept(&clientIp, &clientPort, &err);
    104. if(sock >= 0)
    105. {
    106. AddConnnection(sock, EPOLLIN | EPOLLET,
    107. std::bind(&TcpServer::Read, this, std::placeholders::_1),
    108. std::bind(&TcpServer::Write, this, std::placeholders::_1),
    109. std::bind(&TcpServer::Except, this, std::placeholders::_1));
    110. LogMessage(DEBUG, "git a new link, info: [%s:%d]", clientIp.c_str(), clientPort);
    111. }
    112. else
    113. {
    114. if(err == EAGAIN || err == EWOULDBLOCK) break;//次数说明Accept把文件描述符全部读完了
    115. else if(err == EINTR) continue;//信号中断
    116. else
    117. {
    118. break;//Accept出错了
    119. }
    120. }
    121. }
    122. }
    123. void Read(Connection* conn)//普通读事件的回调
    124. {
    125. conn->lastTime = time(nullptr);
    126. char buffer[1024];
    127. while(1)
    128. {
    129. ssize_t s = recv(conn->_sock, buffer, sizeof(buffer)-1, 0);
    130. if (s > 0)
    131. {
    132. buffer[s] = 0;
    133. conn->_inBuffer += buffer;//将读到的数据存入string
    134. _service(conn);//对读取到的数据进行处理
    135. }
    136. else if (s == 0)//对端关闭连接
    137. {
    138. if (conn->_excepter)//conn将会被释放,后续代码就不要操作conn指针了
    139. {
    140. conn->_excepter(conn);
    141. return;
    142. }
    143. }
    144. else//判断几种读取出异常的情况
    145. {
    146. if(errno == EINTR) continue;
    147. else if(errno == EAGAIN || errno == EWOULDBLOCK) break;
    148. else
    149. {
    150. if(conn->_excepter)
    151. {
    152. conn->_excepter(conn);
    153. return;
    154. }
    155. }
    156. }
    157. }
    158. }
    159. void Write(Connection* conn)//普通写事件的回调
    160. {
    161. conn->lastTime = time(nullptr);
    162. while(1)
    163. {
    164. ssize_t s = send(conn->_sock, conn->_outBuffer.c_str(), sizeof(conn->_outBuffer.size()), 0);
    165. if (s > 0)
    166. {
    167. if (conn->_outBuffer.empty())
    168. {
    169. //EnableReadWrite(conn, true, false);//写事件写完了就关掉
    170. break;
    171. }
    172. else
    173. {
    174. conn->_outBuffer.erase(0, s);
    175. }
    176. }
    177. else
    178. {
    179. if (errno == EAGAIN || errno ==EWOULDBLOCK) { break; }
    180. else if (errno == EINTR) { continue; }
    181. else
    182. {
    183. if (conn->_excepter)
    184. {
    185. conn->_excepter(conn);
    186. return;
    187. }
    188. }
    189. }
    190. }
    191. if (!conn->_outBuffer.empty())//如果没发完
    192. {
    193. conn->_pTS->EnableReadWrite(conn, true, true);
    194. }
    195. else//如果发完了
    196. {
    197. conn->_pTS->EnableReadWrite(conn, true, false);
    198. }
    199. }
    200. void Except(Connection* conn)//异常事件的回调
    201. {
    202. LogMessage(DEBUG, "Except");
    203. _epoll.Control(conn->_sock, 0, EPOLL_CTL_DEL);//在del的时候不关心是何种事件,有fd即可
    204. conn->Close();//关闭套接字
    205. _connections.erase(conn->_sock);
    206. delete conn;
    207. }
    208. void AddConnnection(int sock, uint32_t events, func_t readFunc, func_t writeFunc, func_t errFunc)//添加连接
    209. {
    210. //1、为该sock创建connection并初始化后添加到_connections
    211. if(events & EPOLLET)
    212. {
    213. Util::SetNonBlock(sock);//将监听套接字设置为非阻塞
    214. }
    215. Connection* conn = new Connection(sock, this);//构建Connection对象
    216. //2、给对应的sock设置对应的回调方法
    217. conn->Register(readFunc, writeFunc, errFunc);
    218. //3、将sock与它所关心的事件注册到epoll中
    219. bool r = _epoll.AddEvent(sock, events);
    220. assert(r);
    221. (void)r;
    222. //4、将k、v添加到_connection中
    223. _connections.insert(std::pair<int, Connection*>(sock, conn));
    224. LogMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
    225. }
    226. void Loop(int timeout)//事件派发中的循环函数
    227. {
    228. int n = _epoll.Wait(_revs, _num, timeout);//捞出就绪事件的_revs
    229. for(int i = 0; i < n; ++i)
    230. {
    231. //通过_revs获得已就绪的fd和就绪事件
    232. int sock = _revs[i].data.fd;
    233. uint32_t events = _revs[i].events;
    234. //将异常问题,全部转化为读写问题,因为在读写时,读写接口自带读写问题的异常处理方式
    235. if((events & EPOLLERR)) events |= (EPOLLIN | EPOLLOUT);
    236. if((events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);//对端关闭连接
    237. if((events & EPOLLIN) && IsConnectionExist(sock))//监听事件及其他读事件就绪,保险起见,先判断connect对象是否存在
    238. {
    239. if(_connections[sock]->_recver)//检查存在,防止空指针
    240. _connections[sock]->_recver(_connections[sock]);//从map中找到key值为sock的Connection对象
    241. }
    242. if((events & EPOLLOUT) && IsConnectionExist(sock))
    243. {
    244. if(_connections[sock]->_sender)//检查存在,防止空指针
    245. _connections[sock]->_sender(_connections[sock]);
    246. }
    247. }
    248. }
    249. bool IsConnectionExist(int sock)
    250. {
    251. auto iter = _connections.find(sock);
    252. return iter != _connections.end();
    253. }
    254. private:
    255. uint16_t _port;
    256. Sock _sock;//里面包含有listenS ocket
    257. Epoll _epoll;
    258. std::unordered_map<int, Connection*> _connections;//fd和Connection*
    259. struct epoll_event* _revs;//捞出就绪的事件及其fd的数组,epoll_wait会去捞
    260. int _num;//表示最多可以存储多少个就绪事件
    261. // hander_t _handler;//解协议
    262. func_t _service;
    263. };
    264. }

    2、Epoll.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include "Err.hpp"
    7. #include "Log.hpp"
    8. const int size = 128;//epoll_create使用,大于0即可
    9. class Epoll
    10. {
    11. public:
    12. Epoll()
    13. :_epfd(-1)
    14. {}
    15. ~Epoll()
    16. {
    17. if(_epfd >= 0)
    18. {
    19. close(_epfd);
    20. }
    21. }
    22. public:
    23. void Create();
    24. bool AddEvent(int sock, uint32_t events);
    25. int Wait(struct epoll_event revs[], int num, int timeout);
    26. void Close();
    27. bool Control(int sock, uint32_t event, int action);
    28. private:
    29. int _epfd;
    30. };
    31. void Epoll::Create()
    32. {
    33. _epfd = epoll_create(size);
    34. if(_epfd < 0)//创建epoll模型失败
    35. {
    36. LogMessage(FATAL, "epoll_create error, code: %d, errstring: %s",errno, strerror(errno));
    37. exit(EPOLL_CREATE_ERR);
    38. }
    39. }
    40. bool Epoll::AddEvent(int sock, uint32_t events)//用户到内核
    41. {
    42. struct epoll_event ev;
    43. ev.events = events;
    44. ev.data.fd = sock;
    45. int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
    46. return n == 0;
    47. }
    48. int Epoll::Wait(struct epoll_event revs[], int num, int timeout)//revs是就绪的事件,num表示最多可以存储多少个就绪事件,均为输出型参数
    49. {
    50. int n = epoll_wait(_epfd, revs, num, timeout);
    51. return n;//返回就绪事件的个数
    52. }
    53. void Epoll::Close()
    54. {
    55. if(_epfd >= 0)
    56. {
    57. close(_epfd);
    58. }
    59. }
    60. bool Epoll::Control(int sock, uint32_t event, int action)
    61. {
    62. bool n = 0;
    63. if (action == EPOLL_CTL_MOD)
    64. {
    65. struct epoll_event ev;
    66. ev.events = event;
    67. ev.data.fd = sock;
    68. n = epoll_ctl(_epfd, action, sock, &ev);
    69. }
    70. else if (action == EPOLL_CTL_DEL)
    71. {
    72. n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
    73. }
    74. else { n = -1; }
    75. return n == 0;
    76. }

    3、Main.cc

    1. #include
    2. #include "TcpServer.hpp"
    3. using namespace tcp_server;
    4. static void Usage(std::string proc)
    5. {
    6. std::cerr << "Usage:\n\t" << proc << "port" << "\n\n";
    7. }
    8. //根据传入的req,输出resp
    9. bool Cal(const Request& req,Response& resp)
    10. {
    11. resp._exitCode = OK;
    12. resp._result = OK;
    13. switch(req._op)
    14. {
    15. case '+':
    16. resp._result=req._x+req._y;
    17. break;
    18. case '-':
    19. resp._result=req._x-req._y;
    20. break;
    21. case '*':
    22. resp._result=req._x*req._y;
    23. break;
    24. case '/':
    25. {
    26. if(0==req._y){resp._exitCode=DIV_ZERO_ERR;}
    27. else
    28. resp._result=req._x/req._y;
    29. }
    30. break;
    31. case '%':
    32. {
    33. if(0==req._y){resp._exitCode=MOD_ZERO_ERR;}
    34. else
    35. resp._result=req._x%req._y;
    36. }
    37. break;
    38. default:
    39. resp._exitCode=OP_ZERO_ERR;
    40. return false;
    41. }
    42. return true;
    43. }
    44. void calculate(Connection* conn)//读就绪后,会进行回调,进行计算的处理
    45. {
    46. std::string onePackage;
    47. while(ParseOncePackage(conn->_inBuffer, &onePackage))
    48. {
    49. std::string reqStr;//从一个报文中解析出来的正文部分
    50. if(!deLength(onePackage, &reqStr)) { return; }//提取报文中的有效载荷
    51. std::cout << "仅剩有效载荷的请求:\n" << reqStr << std::endl;
    52. //二、对有效载荷进行反序列化。(将正文的string对象解析x,y,op存储至req对象中)
    53. Request req;//运算数与运算符对象
    54. if(!req.deserialize(reqStr)) { return; }
    55. Response resp;
    56. Cal(req, resp);
    57. //四、对得到的Response计算结果对象,进行序列化,得到一个"字符串",发送给客户端
    58. std::string respStr;//输出型参数,获取序列化string类型的内容(resp_str是序列化后的字符串)
    59. resp.serialize(&respStr);//对计算结果对象resp进行序列化
    60. //五、先构建一个完整的报文,再将其添加到发送缓冲区中
    61. conn->_outBuffer = enLength(respStr);//对序列化数据添加自定义协议规则
    62. std::cout << "result" << conn->_outBuffer << std::endl;
    63. }
    64. //处理完了,直接发回去
    65. if (conn->_sender)
    66. {
    67. conn->_sender(conn);
    68. }
    69. //如果没有发送完毕,需要对对应的socket开启对写事件的关心,如果发完了,则关闭对写事件的关心
    70. // if (!conn->_outBuffer.empty())//如果没发完
    71. // {
    72. // conn->_pTS->EnableReadWrite(conn, true, true);
    73. // }
    74. // else//如果发完了
    75. // {
    76. // conn->_pTS->EnableReadWrite(conn, true, false);
    77. // }
    78. }
    79. int main(int argc, char* argv[])
    80. {
    81. if(argc != 2)
    82. {
    83. Usage(argv[0]);
    84. exit(USAGE_ERR);
    85. }
    86. uint16_t port = atoi(argv[1]);
    87. std::unique_ptr tsvr(new TcpServer(calculate, port));
    88. tsvr->InitServer();
    89. tsvr->Dispatch();
    90. return 0;
    91. }

    4、protocol.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. enum
    10. {
    11. OK=0,
    12. DIV_ZERO_ERR,
    13. MOD_ZERO_ERR,
    14. OP_ZERO_ERR,
    15. };
    16. #define SEP " "
    17. #define SEP_LEN strlen(SEP)//不能使用sizeof,用sizeof会统计到'\0'
    18. #define LINE_SEP "\r\n"
    19. #define LINE_SEP_LINE strlen(LINE_SEP)
    20. //加包头包尾:"_exitcode result" -> "content_len"\r\n"_exitcode result"\r\n
    21. //加包头包尾:"_x _op _y" 修改为 "content_len"\r\n"_x _op _y"\r\n
    22. std::string enLength(const std::string& text)//text:_x _op _y。添加协议规则,用于构建一个完整的报文(类似"打包")
    23. {
    24. std::string send_string=std::to_string(text.size());//计算有效载荷的长度"_x _op _y"
    25. send_string += LINE_SEP;
    26. send_string += text;
    27. send_string += LINE_SEP;
    28. return send_string;
    29. }
    30. //去掉包头包尾"content_len"\r\n"_exitcode result"\r\n -> "_exitcode result"
    31. bool deLength(const std::string& package,std::string* text)//获取报文中的有效载荷(类似"解包")
    32. {
    33. auto pos = package.find(LINE_SEP);
    34. if(pos == std::string::npos) { return false; }
    35. int textLen = std::stoi(package.substr(0, pos));//计算有效载荷的长度
    36. *text = package.substr(pos + LINE_SEP_LINE, textLen);
    37. return true;
    38. }
    39. class Request//请求类
    40. {
    41. public:
    42. Request(int x,int y,char op)
    43. :_x(x)
    44. ,_y(y)
    45. ,_op(op)
    46. {}
    47. Request()
    48. :_x(0)
    49. ,_y(0)
    50. ,_op(0)
    51. {}
    52. bool serialize(std::string* out)//序列化,将成员变量转字符串
    53. {
    54. #ifdef MYSELF
    55. //结构化->"_x _op _y"
    56. *out="";//清空string对象
    57. std::string x_tostring=std::to_string(_x);
    58. std::string y_tostring=std::to_string(_y);
    59. *out=x_tostring+SEP+_op+SEP+y_tostring;//_x _op _y
    60. #else
    61. //Json序列化
    62. Json::Value root;//Json::Value万能对象,可接收任何对象
    63. root["first"]=_x;//自动将_x转换为字符串
    64. root["second"]=_y;
    65. root["oper"]=_op;
    66. //序列化
    67. Json::FastWriter writer;//Json::StyledWriter write;等价
    68. *out=writer.write(root);//将root进行序列化,返回值为string对象,接收即可
    69. #endif
    70. return true;
    71. }
    72. bool deserialize(const std::string& in)//反序列化
    73. {
    74. #ifdef MYSELF
    75. //"_x _op _y"->结构化
    76. auto leftSpace=in.find(SEP);//左边的空格
    77. auto rightSpace=in.rfind(SEP);//右边的空格
    78. if(leftSpace==std::string::npos||rightSpace==std::string::npos){return false;}
    79. if(leftSpace==rightSpace){return false;}
    80. //子串提取
    81. std::string x_tostring=in.substr(0,leftSpace);
    82. if(rightSpace-(leftSpace+SEP_LEN)!=1){return false;}//表示操作符一定只占1位
    83. _op=in.substr(leftSpace+SEP_LEN,rightSpace-(leftSpace+SEP_LEN))[0];
    84. std::string y_tostring=in.substr(rightSpace+SEP_LEN);
    85. //对x,y进行转换
    86. _x=std::stoi(x_tostring);
    87. _y=std::stoi(y_tostring);
    88. #else
    89. //Json反序列化
    90. Json::Value root;//Json::Value万能对象,可接收任何对象
    91. Json::Reader reader;
    92. reader.parse(in,root);//第一个参数:解析哪个流;第二个参数:将解析的数据存放到对象中
    93. //反序列化
    94. _x=root["first"].asInt();//默认是字符串,转换为整型
    95. _y=root["second"].asInt();
    96. _op=root["oper"].asInt();//转换为整型,整型可以给char类型。
    97. #endif
    98. return true;
    99. }
    100. public:
    101. //_x _op _y
    102. int _x;//左操作数
    103. int _y;//右操作数
    104. char _op;//操作符
    105. };
    106. class Response//响应类
    107. {
    108. public:
    109. Response()
    110. :_exitCode(0)
    111. ,_result(0)
    112. {}
    113. Response(int exitCode,int result)
    114. :_exitCode(exitCode)
    115. ,_result(result)
    116. {}
    117. bool serialize(std::string* out)//序列化,将成员变量转string对象
    118. {
    119. #ifdef MYSELF
    120. *out="";//清空string对象
    121. std::string outString=std::to_string(_exitCode)+SEP+std::to_string(_result);
    122. *out=outString;
    123. #else
    124. //Json序列化(对象被序列化为了对应的Json字符串)
    125. Json::Value root;//Json::Value万能对象,可接收任何对象
    126. root["exitCode"]=_exitCode;//自动将_exitCode转换为字符串
    127. root["result"]=_result;
    128. //序列化
    129. Json::FastWriter writer;//Json::StyledWriter write;等价
    130. *out=writer.write(root);//将root进行序列化,返回值为string对象,接收即可
    131. #endif
    132. return true;
    133. }
    134. bool deserialize(const std::string& in)//反序列化
    135. {
    136. #ifdef MYSELF
    137. auto space=in.find(SEP);//找空格
    138. if(space==std::string::npos){return false;}
    139. std::string exitString=in.substr(0,space);
    140. std::string resString=in.substr(space+SEP_LEN);
    141. if(exitString.empty()||resString.empty()){return false;}//一个字符串为空就false
    142. _exitCode=std::stoi(exitString);
    143. _result=std::stoi(resString);
    144. #else
    145. //Json反序列化
    146. Json::Value root;//Json::Value万能对象,可接收任何对象
    147. Json::Reader reader;
    148. reader.parse(in,root);//第一个参数:解析哪个流;第二个参数:将解析的数据存放到对象中
    149. //反序列化
    150. _exitCode=root["exitCode"].asInt();//默认是字符串,转换为整型
    151. _result=root["result"].asInt();
    152. #endif
    153. return true;
    154. }
    155. public:
    156. int _exitCode;//0表示计算成功,非零代表除零等错误
    157. int _result;//运算结果
    158. };
    159. bool ParseOncePackage(std::string& inbuffer, std::string* text)//一次从缓冲区解析出一个报文
    160. {
    161. *text = "";
    162. //拆分成一个个报文
    163. auto pos = inbuffer.find(LINE_SEP);//找\r\n的起始位置
    164. if(pos == std::string::npos)//没找到说明暂时还没找到\r\n分隔符,跳过本次循环,等待下次读取
    165. {
    166. return false;
    167. }
    168. std::string textLenString = inbuffer.substr(0, pos);
    169. int textLen = std::stoi(textLenString);//拿出有效载荷的长度
    170. int totalLen = textLenString.size() + 2 * LINE_SEP_LINE + textLen;//单个报文总长度
    171. if(inbuffer.size() < totalLen)//说明缓冲区长度还不到一个报文大小,需要跳过本次循环继续读取
    172. {
    173. return false;
    174. }
    175. std::cout<<"截取报文前inbuffer中的内容:\n"<
    176. //走到这里,一定有一个完整的报文
    177. *text = inbuffer.substr(0, totalLen);//取出一个报文
    178. inbuffer.erase(0, totalLen);//删掉缓冲区中刚刚被提取走的报文数据
    179. return true;
    180. }

    5、calServer.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. #include "Log.hpp"
    14. #include "protoCal.hpp"
    15. namespace Server
    16. {
    17. enum
    18. {
    19. USAGE_ERR=1,
    20. SOCKET_ERR,
    21. BIND_ERR,
    22. LISTEN_ERR,
    23. };
    24. static const uint16_t gport=8080;//缺省的端口号
    25. static const int gbacklog=5;//最大连接数=5+1
    26. const static std::string defaultIp="0.0.0.0";//缺省的IP
    27. //const Request&:输入型 Response&:输出型
    28. typedef std::function<bool(const Request&,Response&)> func_t;
    29. void handlerEntery(int sock,func_t func)
    30. {
    31. std::string inbuffer;//接收报文的缓冲区
    32. while(1)
    33. {
    34. //一、如何保证服务器读到数据是完整的?
    35. std::string req_text;//输出型参数,得到一条报文
    36. std::string req_str;//输出型参数,得到报文中的有效载荷
    37. if(!recvPackage(sock,inbuffer,&req_text)){return;}//服务器读取单条报文
    38. std::cout<<"带报头的请求:\n"<
    39. if(!deLength(req_text,&req_str)){return;}//提取报文中的有效载荷
    40. std::cout<<"仅剩有效载荷的请求:\n"<
    41. //二、对有效载荷进行反序列化,将提取到的数据存放至req中
    42. Request req;//运算数与运算符对象
    43. if(!req.deserialize(req_str)) return;
    44. //三、计算业务处理,得到一个结构化的结果对象(Response对象)
    45. Response resp;//计算结果对象
    46. func(req,resp);//对req提供的运算数与运算符,通过func将计算结果存放至resp中
    47. //四、对得到的Response计算结果对象,进行序列化,得到一个"字符串",发送给客户端
    48. std::string resp_str;//输出型参数,获取序列化string类型的内容
    49. resp.serialize(&resp_str);//对计算结果对象resp进行序列化
    50. std::cout<<"计算完成的序列化string对象:"<
    51. //五、先构建一个完整的报文,再进行发送
    52. std::string send_string=enLength(resp_str);//对序列化数据添加自定义协议规则
    53. std::cout<<"添加报头的序列化string对象:"<
    54. send(sock,send_string.c_str(),send_string.size(),0);//服务器发送序列化内容给客户端(此处存在问题)
    55. }
    56. }
    57. class CalServer
    58. {
    59. public:
    60. CalServer(const uint16_t& port=gport,const std::string& ip=defaultIp )
    61. :_listenSocket(-1)
    62. ,_port(port)
    63. ,_ip(ip)
    64. {
    65. }
    66. void InitServer()//初始化服务器
    67. {
    68. //1、创建监听socket套接字
    69. _listenSocket=socket(AF_INET,SOCK_STREAM,0);
    70. if(_listenSocket<0)
    71. {
    72. LogMessage(FATAL,"create socket error");
    73. exit(SOCKET_ERR);
    74. }
    75. LogMessage(NORMAL,"create socket success");
    76. //2、绑定端口号+ip地址
    77. struct sockaddr_in local;
    78. memset(&local,0,sizeof(local));
    79. local.sin_addr.s_addr=inet_addr(_ip.c_str());
    80. local.sin_family=AF_INET;
    81. local.sin_port=htons(_port);
    82. if(bind(_listenSocket,(struct sockaddr*)&local,sizeof(local))<0)
    83. {
    84. LogMessage(FATAL,"bind socket error");
    85. exit(BIND_ERR);
    86. }
    87. LogMessage(NORMAL,"bind socket success");
    88. //3、设置监听状态
    89. if(-1==listen(_listenSocket,gbacklog))
    90. {
    91. LogMessage(FATAL,"listen socket error");
    92. exit(LISTEN_ERR);
    93. }
    94. LogMessage(NORMAL,"listen socket success");
    95. }
    96. void Start(func_t func)//启动服务器
    97. {
    98. LogMessage(NORMAL,"Thread init success");
    99. while(1)
    100. {
    101. //4、服务器获取客户端连接请求
    102. struct sockaddr_in peer;//输出型参数,拿到客户端的信息
    103. socklen_t len=sizeof(peer);
    104. int sock=accept(_listenSocket,(struct sockaddr*)&peer,&len);
    105. if(-1==sock)
    106. {
    107. LogMessage(ERROR,"accept error,next");
    108. continue;
    109. }
    110. LogMessage(NORMAL,"accept a new link success");
    111. //6、使用accept的返回值sock进行通信,均为文件操作
    112. pid_t id=fork();
    113. if(id==0)//子进程
    114. {
    115. close(_listenSocket);//关闭子进程的监听套接字,使监听套接字计数-1(防止下一步孙子进程拷贝)
    116. if(fork()>0) exit(0);//让子进程退出,孙子进程成为孤儿进程,交给1号进程托管回收其退出资源
    117. //ServerIO(sock);
    118. handlerEntery(sock,func);//从sock读取请求
    119. close(sock);//必须关闭使用完毕的sock,否则文件描述符泄漏(虽然下一句代码exit(0),孙子进程退出也会释放文件描述符,最好还是手动关一下)
    120. exit(0);
    121. }
    122. close(sock);//这是用于通信的套接字fd,父进程和孙子进程都有这个文件描述符,父进程关了,该文件描述符引用技术-1,直至孙子进程退出,该fd才会减为0,关闭
    123. //父进程
    124. pid_t ret=waitpid(id,nullptr,0);//这里不能用非阻塞等待,否则父进程先跑去执行其他代码,可能会被卡在accept出不来了(没有新的客户端来连接的话)
    125. if(ret>0)
    126. {
    127. LogMessage(NORMAL,"wait child success");
    128. }
    129. }
    130. }
    131. ~CalServer()
    132. {}
    133. private:
    134. int _listenSocket;//监听客户端的连接请求,不用于数据通信
    135. uint16_t _port;//服务器端口号
    136. std::string _ip;//服务器ip地址
    137. };
    138. }
  • 相关阅读:
    毕设选题推荐基于微信小程序智能停车预定管理系统
    算法题系列10·最长公共前缀
    Android切圆角的几种方式
    08 nginx 的一次请求处理调试
    备战蓝桥杯---图论应用1
    C++红黑树
    CentOS 30分钟部署免费在线客服系统
    C# 串口通信简单示例
    Stream手动分页
    input修改checkbox复选框默认选中样式
  • 原文地址:https://blog.csdn.net/gfdxx/article/details/133968711