• reactor的原理与实现


    目录

    1.  Reactor模型

    2.  Reactor 模型有三个重要的组件:

    3.  reactor所需要结构体分析

    4.  epoll对fd操作的封装

    5.  reactor的操作

    6.  事件的回调函数

     7.  完整代码和测试结果

    总结


    1.  Reactor模型

    普通函数调用的机制:

            程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。

            

    什么是Reactor?

            Reactor释义“反应堆”,框架:是一种事件驱动的反应堆模式,高效的事件处理模型。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

            reactor反应堆:当有事件,事件类型可能不相同,需要提前注册好不同的事件处理函数。由epoll_wait获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制(事件处理器),需要提前注册的哪些接口函数。

     

    2.  Reactor 模型有三个重要的组件:

    1.多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。

    2. 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。

     

    3. 事件处理器:负责处理特定事件的处理函数。

    具体流程如下:

    1. 注册读就绪事件和相应的事件处理器;

    2. 事件分离器等待事件;

    3. 事件到来,激活分离器,分离器调用事件对应的处理器;

    4. 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制

    权。

    3.  reactor所需要结构体分析

    1. 管理每一个fd的结构体

    1. //管理每一个io fd的结构体
    2. struct ntyevent{
    3. int fd; //io fd
    4. int events;
    5. void *arg;
    6. int (*callback)(int fd, int events, void* arg); //执行回调函数
    7. int status; //判断是否已有事件
    8. char buffer[BUFFER_LENGTH]; //用户缓冲区
    9. int length; //用户缓冲区长度
    10. };

     2. 管理ntyevent fd的块

    1. //管理ntyevent fd的块
    2. struct eventblock{
    3. struct eventblock* next; //指向ntyevent fd集合
    4. struct ntyevent* events; //指向下一个ntyevent fd的块
    5. };

     3. Reactor结构体

     

    1. //reacotr结点
    2. struct ntyreactor{
    3. int epfd; //epoll fd
    4. int blkcnt; //ntyevent fd的块 计数
    5. struct eventblock* evblks; //指向ntyevent fd的块头结点
    6. };

     

    4.  epoll对fd操作的封装

    1. //io fd结构体设置
    2. void nty_event_set(struct ntyevent* ev, int fd, NCALLBACK callback, void* arg)
    3. {
    4. ev->fd = fd;
    5. ev->callback = callback;
    6. ev->events = 0;
    7. ev->arg = arg;
    8. return ;
    9. }
    10. //io fd add 事件
    11. int nty_event_add(int epfd, int events, struct ntyevent *ev)
    12. {
    13. struct epoll_event ep_ev = {0, {0}};
    14. ep_ev.data.ptr = ev; //io fd结构体
    15. ep_ev.events = ev->events = events; //需要检测的fd事件
    16. int op; //操作类型
    17. if(ev->status == 1){
    18. op = EPOLL_CTL_MOD; //修改
    19. }else{
    20. op = EPOLL_CTL_ADD; //添加
    21. ev->status = 1; //标志已经添加
    22. }
    23. if(epoll_ctl(epfd, op, ev->fd, &ep_ev) <0 ){ //绑定
    24. printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
    25. return -1;
    26. }
    27. return 0;
    28. }
    29. //io fd del 事件
    30. int nty_event_del(int epfd, struct ntyevent* ev)
    31. {
    32. struct epoll_event ep_ev = {0, {0}};
    33. if(ev->status != 1){ //没有添加过检测的fd事件
    34. return -1;
    35. }
    36. ep_ev.data.ptr = ev;
    37. ev->status = 0; //标志未添加
    38. epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    39. return 0;
    40. }

     

    5.  reactor的操作

    1. //reactor扩展大小
    2. int ntyreactor_alloc(struct ntyreactor* reactor)
    3. {
    4. if(reactor == NULL) return -1;
    5. if(reactor->evblks == NULL) return -1;
    6. struct eventblock* blk = reactor->evblks; //块的头结点
    7. //找尾节点
    8. while(blk->next != NULL){ //找到尾节点
    9. blk = blk->next;
    10. }
    11. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    12. if (evs == NULL) {
    13. printf("ntyreactor_alloc ntyevent failed\n");
    14. return -2;
    15. }
    16. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    17. struct eventblock *block = malloc(sizeof(struct eventblock));
    18. if (block == NULL) {
    19. printf("ntyreactor_alloc eventblock failed\n");
    20. return -3;
    21. }
    22. //io fd集合连接成块
    23. block->events = evs;
    24. block->next = NULL;
    25. //指向新块
    26. blk->next = block;
    27. reactor->blkcnt ++;
    28. return 0;
    29. }
    30. //根据io fd来找fd结构体
    31. struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
    32. if (reactor == NULL) return NULL;
    33. if (reactor->evblks == NULL) return NULL;
    34. int blkidx = sockfd / MAX_EPOLL_EVENTS; //在哪一个块
    35. while (blkidx >= reactor->blkcnt) { //大小不够扩容
    36. ntyreactor_alloc(reactor);
    37. }
    38. int i = 0;
    39. struct eventblock *blk = reactor->evblks; //头结点块
    40. while (i++ != blkidx && blk != NULL) { //找到所在的块
    41. blk = blk->next;
    42. }
    43. return &blk->events[sockfd % MAX_EPOLL_EVENTS]; //返回fd结构体
    44. }
    45. //reactor初始化
    46. int ntyreactor_init(struct ntyreactor* reactor)
    47. {
    48. if(reactor == NULL) return -1;
    49. memset(reactor, 0, sizeof(struct ntyreactor));
    50. reactor->epfd = epoll_create(1);
    51. if (reactor->epfd <= 0) {
    52. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    53. return -2;
    54. }
    55. //创建第一个块
    56. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    57. if (evs == NULL) {
    58. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    59. close(reactor->epfd);
    60. return -3;
    61. }
    62. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    63. struct eventblock *block = malloc(sizeof(struct eventblock));
    64. if (block == NULL) {
    65. free(evs);
    66. close(reactor->epfd);
    67. return -3;
    68. }
    69. block->events = evs;
    70. block->next = NULL;
    71. reactor->evblks = block;
    72. reactor->blkcnt = 1;
    73. return 0;
    74. }
    75. //销毁reactor
    76. int ntyreactor_destory(struct ntyreactor* reactor)
    77. {
    78. close(reactor->epfd);
    79. struct eventblock *blk = reactor->evblks;
    80. struct eventblock *blk_next;
    81. while (blk != NULL) {
    82. blk_next = blk->next;
    83. free(blk->events);
    84. free(blk);
    85. blk = blk_next;
    86. }
    87. return 0;
    88. }

    6.  事件的回调函数

    1. //recv回调
    2. int recv_cb(int fd, int events, void* arg)
    3. {
    4. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    5. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    6. if(ev == NULL)return -1;
    7. int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    8. nty_event_del(reactor->epfd, ev);
    9. if (len > 0) {
    10. ev->length = len;
    11. ev->buffer[len] = '\0';
    12. printf("recv [%d]:%s\n", fd, ev->buffer);
    13. //将fd 设置为发送事件
    14. nty_event_set(ev, fd, send_cb, reactor);
    15. nty_event_add(reactor->epfd, EPOLLOUT, ev);
    16. } else if (len == 0) { //客户端断开连接
    17. nty_event_del(reactor->epfd, ev);
    18. printf("recv_cb --> disconnect\n");
    19. close(ev->fd);
    20. } else { //返回错误
    21. if (errno == EAGAIN && errno == EWOULDBLOCK) { //
    22. } else if (errno == ECONNRESET){
    23. nty_event_del(reactor->epfd, ev);
    24. close(ev->fd);
    25. }
    26. printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    27. }
    28. return len;
    29. }
    30. //send回调
    31. int send_cb(int fd, int events, void* arg)
    32. {
    33. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    34. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    35. if (ev == NULL) return -1;
    36. int len = send(fd, ev->buffer, ev->length, 0);
    37. if (len > 0) {
    38. printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
    39. //发送后,将fd设置为接收事件
    40. nty_event_del(reactor->epfd, ev);
    41. nty_event_set(ev, fd, recv_cb, reactor);
    42. nty_event_add(reactor->epfd, EPOLLIN, ev);
    43. } else { //发送失败
    44. nty_event_del(reactor->epfd, ev);
    45. close(ev->fd);
    46. printf("send[fd=%d] error %s\n", fd, strerror(errno));
    47. }
    48. return len;
    49. }
    50. //客户端接入回调
    51. int accept_cb(int fd, int events, void* arg)
    52. {
    53. struct ntyreactor *reactor = (struct ntyreactor*)arg;
    54. if (reactor == NULL) return -1;
    55. struct sockaddr_in client_addr;
    56. socklen_t len = sizeof(client_addr);
    57. int clientfd;
    58. //客户端接入
    59. if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
    60. if (errno != EAGAIN && errno != EINTR) {
    61. }
    62. printf("accept: %s\n", strerror(errno));
    63. return -1;
    64. }
    65. //设置非阻塞fd
    66. int flag = 0;
    67. if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
    68. printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
    69. return -1;
    70. }
    71. struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
    72. if (event == NULL) return -1;
    73. //将该fd设置为recv
    74. nty_event_set(event, clientfd, recv_cb, reactor);
    75. nty_event_add(reactor->epfd, EPOLLIN, event);
    76. printf("new connect [%s:%d], pos[%d]\n",
    77. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
    78. return 0;
    79. }

     7.  完整代码和测试结果

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #define BUFFER_LENGTH 1024
    11. #define MAX_EPOLL_EVENTS 1024 //epoll事件数量
    12. #define SERVER_PORT 8888
    13. #define PORT_COUNT 100
    14. typedef int NCALLBACK(int ,int ,void*);
    15. //管理每一个io fd的结构体
    16. struct ntyevent{
    17. int fd; //io fd
    18. int events;
    19. void *arg;
    20. int (*callback)(int fd, int events, void* arg); //执行回调函数
    21. int status; //判断是否已有事件
    22. char buffer[BUFFER_LENGTH]; //用户缓冲区
    23. int length; //用户缓冲区长度
    24. };
    25. //管理ntyevent fd的块
    26. struct eventblock{
    27. struct eventblock* next; //指向ntyevent fd集合
    28. struct ntyevent* events; //指向下一个ntyevent fd的块
    29. };
    30. //reacotr结点
    31. struct ntyreactor{
    32. int epfd; //epoll fd
    33. int blkcnt; //ntyevent fd的块 计数
    34. struct eventblock* evblks; //指向ntyevent fd的块头结点
    35. };
    36. int recv_cb(int fd, int events, void *arg);
    37. int send_cb(int fd, int events, void *arg);
    38. int accept_cb(int fd, int events, void* arg);
    39. struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);
    40. //io fd结构体设置
    41. void nty_event_set(struct ntyevent* ev, int fd, NCALLBACK callback, void* arg)
    42. {
    43. ev->fd = fd;
    44. ev->callback = callback;
    45. ev->events = 0;
    46. ev->arg = arg;
    47. return ;
    48. }
    49. //io fd add
    50. int nty_event_add(int epfd, int events, struct ntyevent *ev)
    51. {
    52. struct epoll_event ep_ev = {0, {0}};
    53. ep_ev.data.ptr = ev; //io fd结构体
    54. ep_ev.events = ev->events = events; //需要检测的fd事件
    55. int op; //操作类型
    56. if(ev->status == 1){
    57. op = EPOLL_CTL_MOD; //修改
    58. }else{
    59. op = EPOLL_CTL_ADD; //添加
    60. ev->status = 1; //标志已经添加
    61. }
    62. if(epoll_ctl(epfd, op, ev->fd, &ep_ev) <0 ){ //绑定
    63. printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
    64. return -1;
    65. }
    66. return 0;
    67. }
    68. //io fd del
    69. int nty_event_del(int epfd, struct ntyevent* ev)
    70. {
    71. struct epoll_event ep_ev = {0, {0}};
    72. if(ev->status != 1){ //没有添加过检测的fd事件
    73. return -1;
    74. }
    75. ep_ev.data.ptr = ev;
    76. ev->status = 0; //标志未添加
    77. epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    78. return 0;
    79. }
    80. //recv回调
    81. int recv_cb(int fd, int events, void* arg)
    82. {
    83. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    84. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    85. if(ev == NULL)return -1;
    86. int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    87. nty_event_del(reactor->epfd, ev);
    88. if (len > 0) {
    89. ev->length = len;
    90. ev->buffer[len] = '\0';
    91. printf("recv [%d]:%s\n", fd, ev->buffer);
    92. //将fd 设置为发送事件
    93. nty_event_set(ev, fd, send_cb, reactor);
    94. nty_event_add(reactor->epfd, EPOLLOUT, ev);
    95. } else if (len == 0) { //客户端断开连接
    96. nty_event_del(reactor->epfd, ev);
    97. printf("recv_cb --> disconnect\n");
    98. close(ev->fd);
    99. } else { //返回错误
    100. if (errno == EAGAIN && errno == EWOULDBLOCK) { //
    101. } else if (errno == ECONNRESET){
    102. nty_event_del(reactor->epfd, ev);
    103. close(ev->fd);
    104. }
    105. printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    106. }
    107. return len;
    108. }
    109. //send回调
    110. int send_cb(int fd, int events, void* arg)
    111. {
    112. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    113. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    114. if (ev == NULL) return -1;
    115. int len = send(fd, ev->buffer, ev->length, 0);
    116. if (len > 0) {
    117. printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
    118. //发送后,将fd设置为接收事件
    119. nty_event_del(reactor->epfd, ev);
    120. nty_event_set(ev, fd, recv_cb, reactor);
    121. nty_event_add(reactor->epfd, EPOLLIN, ev);
    122. } else { //发送失败
    123. nty_event_del(reactor->epfd, ev);
    124. close(ev->fd);
    125. printf("send[fd=%d] error %s\n", fd, strerror(errno));
    126. }
    127. return len;
    128. }
    129. //客户端接入回调
    130. int accept_cb(int fd, int events, void* arg)
    131. {
    132. struct ntyreactor *reactor = (struct ntyreactor*)arg;
    133. if (reactor == NULL) return -1;
    134. struct sockaddr_in client_addr;
    135. socklen_t len = sizeof(client_addr);
    136. int clientfd;
    137. //客户端接入
    138. if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
    139. if (errno != EAGAIN && errno != EINTR) {
    140. }
    141. printf("accept: %s\n", strerror(errno));
    142. return -1;
    143. }
    144. //设置非阻塞fd
    145. int flag = 0;
    146. if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
    147. printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
    148. return -1;
    149. }
    150. struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
    151. if (event == NULL) return -1;
    152. //将该fd设置为recv
    153. nty_event_set(event, clientfd, recv_cb, reactor);
    154. nty_event_add(reactor->epfd, EPOLLIN, event);
    155. printf("new connect [%s:%d], pos[%d]\n",
    156. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
    157. return 0;
    158. }
    159. //创建socket监听
    160. int init_sock(short port)
    161. {
    162. int fd = socket(AF_INET, SOCK_STREAM, 0);
    163. fcntl(fd, F_SETFL, O_NONBLOCK);
    164. struct sockaddr_in server_addr;
    165. memset(&server_addr, 0, sizeof(server_addr));
    166. server_addr.sin_family = AF_INET;
    167. server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    168. server_addr.sin_port = htons(port);
    169. bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    170. if (listen(fd, 20) < 0) {
    171. printf("listen failed : %s\n", strerror(errno));
    172. return -1;
    173. }
    174. printf("listen server port : %d\n", port);
    175. return fd;
    176. }
    177. //reactor扩展大小
    178. int ntyreactor_alloc(struct ntyreactor* reactor)
    179. {
    180. if(reactor == NULL) return -1;
    181. if(reactor->evblks == NULL) return -1;
    182. struct eventblock* blk = reactor->evblks; //块的头结点
    183. //找尾节点
    184. while(blk->next != NULL){ //找到尾节点
    185. blk = blk->next;
    186. }
    187. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    188. if (evs == NULL) {
    189. printf("ntyreactor_alloc ntyevent failed\n");
    190. return -2;
    191. }
    192. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    193. struct eventblock *block = malloc(sizeof(struct eventblock));
    194. if (block == NULL) {
    195. printf("ntyreactor_alloc eventblock failed\n");
    196. return -3;
    197. }
    198. //io fd集合连接成块
    199. block->events = evs;
    200. block->next = NULL;
    201. //指向新块
    202. blk->next = block;
    203. reactor->blkcnt ++;
    204. return 0;
    205. }
    206. //根据io fd来找fd结构体
    207. struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
    208. if (reactor == NULL) return NULL;
    209. if (reactor->evblks == NULL) return NULL;
    210. int blkidx = sockfd / MAX_EPOLL_EVENTS; //在哪一个块
    211. while (blkidx >= reactor->blkcnt) { //大小不够扩容
    212. ntyreactor_alloc(reactor);
    213. }
    214. int i = 0;
    215. struct eventblock *blk = reactor->evblks; //头结点块
    216. while (i++ != blkidx && blk != NULL) { //找到所在的块
    217. blk = blk->next;
    218. }
    219. return &blk->events[sockfd % MAX_EPOLL_EVENTS]; //返回fd结构体
    220. }
    221. //reactor初始化
    222. int ntyreactor_init(struct ntyreactor* reactor)
    223. {
    224. if(reactor == NULL) return -1;
    225. memset(reactor, 0, sizeof(struct ntyreactor));
    226. reactor->epfd = epoll_create(1);
    227. if (reactor->epfd <= 0) {
    228. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    229. return -2;
    230. }
    231. //创建第一个块
    232. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    233. if (evs == NULL) {
    234. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    235. close(reactor->epfd);
    236. return -3;
    237. }
    238. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    239. struct eventblock *block = malloc(sizeof(struct eventblock));
    240. if (block == NULL) {
    241. free(evs);
    242. close(reactor->epfd);
    243. return -3;
    244. }
    245. block->events = evs;
    246. block->next = NULL;
    247. reactor->evblks = block;
    248. reactor->blkcnt = 1;
    249. return 0;
    250. }
    251. //销毁reactor
    252. int ntyreactor_destory(struct ntyreactor* reactor)
    253. {
    254. close(reactor->epfd);
    255. struct eventblock *blk = reactor->evblks;
    256. struct eventblock *blk_next;
    257. while (blk != NULL) {
    258. blk_next = blk->next;
    259. free(blk->events);
    260. free(blk);
    261. blk = blk_next;
    262. }
    263. return 0;
    264. }
    265. //初始化接收连接socket
    266. int ntyreactor_addlistener(struct ntyreactor* reactor, int sockfd, NCALLBACK *acceptor){
    267. if (reactor == NULL) return -1;
    268. if (reactor->evblks == NULL) return -1;
    269. struct ntyevent* event = ntyreactor_idx(reactor, sockfd);
    270. if (event == NULL) return -1;
    271. nty_event_set(event, sockfd, acceptor, reactor);
    272. nty_event_add(reactor->epfd, EPOLLIN, event);
    273. return 0;
    274. }
    275. //reactor事件循环
    276. int ntyreactor_run(struct ntyreactor* reactor)
    277. {
    278. if (reactor == NULL) return -1;
    279. if (reactor->epfd < 0) return -1;
    280. if (reactor->evblks == NULL) return -1;
    281. struct epoll_event events[MAX_EPOLL_EVENTS+1];
    282. int checkpos = 0, i;
    283. while(1){
    284. int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
    285. if (nready < 0) {
    286. printf("epoll_wait error, exit\n");
    287. continue;
    288. }
    289. for(i = 0;i < nready; i++){
    290. struct ntyevent* ev = (struct ntyevent*)events[i].data.ptr; //发生事件的io fd结构体
    291. if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)){
    292. ev->callback(ev->fd, events[i].events, ev->arg);
    293. }
    294. if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)){
    295. ev->callback(ev->fd, events[i].events, ev->arg);
    296. }
    297. }
    298. }
    299. }
    300. int main(int argc, char *argv[]) {
    301. struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    302. ntyreactor_init(reactor);
    303. //起始的端口号
    304. unsigned short port = SERVER_PORT;
    305. if (argc == 2) {
    306. port = atoi(argv[1]);
    307. }
    308. int i = 0;
    309. int sockfds[PORT_COUNT] = {0};
    310. for (i = 0;i < PORT_COUNT;i ++) {
    311. sockfds[i] = init_sock(port+i);
    312. ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    313. }
    314. ntyreactor_run(reactor);
    315. ntyreactor_destory(reactor);
    316. for (i = 0;i < PORT_COUNT;i ++) {
    317. close(sockfds[i]);
    318. }
    319. free(reactor);
    320. return 0;
    321. }

     运行结果:

     

    总结

    本章实现了一个网络经典模型,设计模式reactor 事件循环,事件驱动的反应堆模式.

    组件:

    事件处理器:回调函数callback    

    事件分发器:(将事件分发给对应的事件处理器),

    多路复用器:(select poll epoll 等操作系统提供的多路复用技术)

    流程:

    1. 注册读就绪事件和相应的事件处理器;
    2. 事件分离器等待事件;
    3. 事件到来,激活分离器,分离器调用事件对应的处理器;
    4. 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制
    权。 

  • 相关阅读:
    MAYA教程之建模基础命令介绍
    Bootstrap中的栅格系统
    TiDB 6.0:让 TSO 更高效丨TiDB Book Rush
    JAVA计算机毕业设计毕业生交流学习平台Mybatis+源码+数据库+lw文档+系统+调试部署
    把 Maven 提交到项目?Maven Wrapper的使用与好处
    [项目管理-30]:项目成员成熟度以及采取的不同的策略
    linux查看系统/内核版本号、CPU核数/线程/型号、内存大小等
    成绩统计-蓝桥杯
    ES6中新增加的Map和Set数据结构的使用场景
    微软智能云在华发布多项混合云服务及功能更新
  • 原文地址:https://blog.csdn.net/kakaka666/article/details/126216142