• 基于reactor设计websocket服务器


    目录

    1.什么是WebSocket?

    2.总体过程:

    2.1 握手过程:

    2.2 接收和发送

    3. 完整代码

    4.运行结果

    总结:


    1.什么是WebSocket?

            WebSocket是HTML5下一种新的协议(websocket协议本质上是一个基于tcp的协议)

            websocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

    2.总体过程:

    2.1 握手过程:

            首先,客户端发起http请求,经过3次握手后,建立起TCP连接;http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等;

            然后,服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据;最后,客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。

            第一个一条信息来自|Sec-WebSocket-Key|报头字段在客户端握手中:Sec-WebSocket-Key: fmXyv9eR4PG9L53s09jQLA==对于这个报头字段,服务器必须接受该值(作为当前值)在报头字段中,例如,base64编码的[RFC4648]版本减任何前导和尾随空格),并将其与全局唯一标识符(GUID, [RFC4122])“258 eafa5-e914-47da -95CA-C5AB0DC85B11”的字符串形式,不太可能被使用网络端点不理解WebSocket协议。一个SHA-1哈希(160位)[FIPS;180-3],base64编码,然后返回到服务器的握手。

    1. ###websocket 浏览器-->服务器
    2. GET / HTTP/1.1
    3. Host: 192.168.240.128:8888
    4. Connection: Upgrade ##升级版
    5. Pragma: no-cache
    6. Cache-Control: no-cache
    7. User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36 Core/1.94.172.400 QQBrowser/11.1.5140.400
    8. Upgrade: websocket #websockrt
    9. Origin: null
    10. Sec-WebSocket-Version: 13
    11. Accept-Encoding: gzip, deflate
    12. Accept-Language: zh-CN,zh;q=0.9
    13. Sec-WebSocket-Key: fmXyv9eR4PG9L53s09jQLA==
    14. Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
    15. ###握手信息拼接Key+GUID
    16. Key = fmXyv9eR4PG9L53s09jQLA==
    17. GUID = 258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    18. str = Key+GUID = fmXyv9eR4PG9L53s09jQLA==258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    19. ###哈希SHA-1
    20. sha = SHA-1(str)
    21. ###base64转换
    22. vaule = base64-encoded(sha);
    23. ###服务器-->浏览器
    24. HTTP/1.1 101 Switching Protocols
    25. Upgrade: websocket
    26. Connection: Upgrade
    27. Sec-WebSocket-Accept: vaule

    具体代码实现

    1. //响应包拼接
    2. int ws_handshark(struct ntyevent *ev)
    3. {
    4. int idx = 0;
    5. char sec_data[128] = {0};
    6. char sec_accept[128] = {0};
    7. do{
    8. char linebuff[1024] = {0};
    9. idx = readline(ev->buffer, idx, linebuff);
    10. if(strstr(linebuff, "Sec-WebSocket-Key") > 0){
    11. //Sec-WebSocket-Key: hGI6OP19pWseAnpMcEnb2g==
    12. //Key+GUID
    13. //Sec-WebSocket-Key: hGI6OP19pWseAnpMcEnb2g==258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    14. strcat(linebuff, GUID);
    15. //哈希SHA-1
    16. SHA1(linebuff + strlen("Sec-WebSocket-Key: "), strlen(linebuff + strlen("Sec-WebSocket-Key: ")), sec_data);
    17. //base64 编码
    18. base64_encode(sec_data, strlen(sec_data), sec_accept);
    19. //printf("idx: %d, line: %ld\n",idx, sizeof("Sec-WebSocket-Key: "));
    20. //printf("idx: %d, line: %ld\n",idx, strlen("Sec-WebSocket-Key: "));
    21. printf("idx %d ; line:%s\n", idx, sec_accept);
    22. memcpy(ev->sec_accept, sec_accept, ACCEPT_KEY_LENGTH);
    23. }
    24. //printf("line %d ; line:%s\n", idx, linebuff);
    25. }while((ev->buffer[idx] != '\r' || ev->buffer[idx + 1] != '\n') && idx != -1); //两组\r\n结束
    26. return 0;
    27. }
    28. //应答 #服务器-->浏览器
    29. int ws_response(struct ntyevent* ev)
    30. {
    31. ev->wlength = sprintf(ev->wbuffer, "HTTP/1.1 101 Switching Protocols\r\n"
    32. "Upgrade: websocket\r\n"
    33. "Connection: Upgrade\r\n"
    34. "Sec-WebSocket-Accept: %s\r\n\r\n", ev->sec_accept);
    35. printf("response: %s\n", ev->wbuffer);
    36. return ev->wlength;
    37. }

    2.2 接收和发送

    websocket协议的数据帧

     FIN :1bit

    指示这是消息中的最后一个片段。第一个碎片也可能是最后的碎片。

    RSV1、RSV2、RSV3:各1bit

    必须为0,除非协商一个定义含义的扩展为非零值。如果接收到一个非零值,并且没有

    协商好的扩展定义了这种非零的含义值,接收端点必须失败WebSocketConnection.

    Opcode: 4 bits

    定义“有效负载数据”的解释。如果一个未知的操作码,接收端点必须失败WebSocket Connection_。定义了以下值。

    * %x0  表示延续帧

    * %x1  表示文本框架

    * %x2  表示二进制帧

    * %x3-7 为其他非控制帧保留

    * %x8 表示连接关闭

    * %x9 表示ping

    * %xA 表示pong

    * %xB-F  为进一步的控制帧保留

    Mask: 1 bit

    定义是否屏蔽“有效负载数据”。如果设置为1,则a屏蔽键包含在屏蔽键中,用于解除屏蔽。

    解除屏蔽

    j = i MOD 4

    transformed-octet-i = original-octet-i XOR masking-key-octet-j

    Payload length: 7 bits, 7+16 bits, or 7+64 bits

    “有效载荷数据”的长度,以字节为单位:如果0-125,则为有效载荷长度。如果126,下面的2个字节被解释为a16位无符号整数是有效载荷长度。如果127,后面的8个字节被解释为64位无符号整数最有效位必须为0)为有效载荷长度。多字节长度量用网络字节顺序表示。请注意,在所有情况下,必须使用最小字节数进行编码长度

     将数据帧映射到结构体,小端字节序

    1. struct ws_ophdr { //小端
    2. unsigned char opcode:4,
    3. rsv3:1,
    4. rsv2:1,
    5. rsv1:1,
    6. fin:1;
    7. unsigned char pl_len:7,
    8. mask:1;
    9. };

     具体代码

    1. //Masking-key, if MASK set to 1 解除数据屏蔽
    2. void umask(char *payload, int length, char *mask_key) {
    3. int i = 0;
    4. for (i = 0;i < length;i ++) {
    5. payload[i] ^= mask_key[i%4];
    6. }
    7. }
    8. int ws_tranmission(struct ntyevent *ev)
    9. {
    10. struct ws_ophdr *hdr = (struct ws_ophdr *)ev->buffer;
    11. if (hdr->pl_len < 126) {
    12. unsigned char *payload = NULL;
    13. if (hdr->mask) {
    14. payload = ev->buffer + 6;
    15. umask(payload, hdr->pl_len, ev->buffer + 2);
    16. } else {
    17. payload = ev->buffer + 2;
    18. }
    19. printf("payload: %s\n", payload);
    20. } else if (hdr->pl_len == 126) {
    21. } else if (hdr->pl_len == 127) {
    22. } else {
    23. //assert(0);
    24. }
    25. return 0;
    26. }

    3. 完整代码

    1. //gcc reactor_server_websocket.c -o server -lssl -lcrypto
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. #include
    14. #include
    15. #define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
    16. //状态机
    17. enum {
    18. WS_HANDSHARK = 0, //握手状态
    19. WS_TRANMISSION = 1, //数据传输
    20. WS_END = 2, //终止传输
    21. WS_COUNT
    22. };
    23. struct ws_ophdr { //小端
    24. unsigned char opcode:4,
    25. rsv3:1,
    26. rsv2:1,
    27. rsv1:1,
    28. fin:1;
    29. unsigned char pl_len:7,
    30. mask:1;
    31. };
    32. #define BUFFER_LENGTH 1024
    33. #define ACCEPT_KEY_LENGTH 64
    34. #define MAX_EPOLL_EVENTS 1024 //epoll事件数量
    35. #define SERVER_PORT 8888
    36. #define PORT_COUNT 1
    37. typedef int NCALLBACK(int ,int ,void*);
    38. //管理每一个io fd的结构体
    39. struct ntyevent{
    40. int fd; //io fd
    41. int events;
    42. void *arg;
    43. int (*callback)(int fd, int events, void* arg); //执行回调函数
    44. int status; //判断是否已有事件
    45. char buffer[BUFFER_LENGTH]; //用户缓冲区 //request
    46. int length; //用户缓冲区长度
    47. char wbuffer[BUFFER_LENGTH]; //response
    48. int wlength;
    49. char sec_accept[ACCEPT_KEY_LENGTH];
    50. int wsstatus; //0, 1, 2, 3 描述状态机
    51. };
    52. //管理ntyevent fd的块
    53. struct eventblock{
    54. struct eventblock* next; //指向ntyevent fd集合
    55. struct ntyevent* events; //指向下一个ntyevent fd的块
    56. };
    57. //reacotr结点
    58. struct ntyreactor{
    59. int epfd; //epoll fd
    60. int blkcnt; //ntyevent fd的块 计数
    61. struct eventblock* evblks; //指向ntyevent fd的块头结点
    62. };
    63. int recv_cb(int fd, int events, void *arg);
    64. int send_cb(int fd, int events, void *arg);
    65. int accept_cb(int fd, int events, void* arg);
    66. struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);
    67. //io fd结构体设置
    68. void nty_event_set(struct ntyevent* ev, int fd, NCALLBACK callback, void* arg)
    69. {
    70. ev->fd = fd;
    71. ev->callback = callback;
    72. ev->events = 0;
    73. ev->arg = arg;
    74. return ;
    75. }
    76. //io fd add
    77. int nty_event_add(int epfd, int events, struct ntyevent *ev)
    78. {
    79. struct epoll_event ep_ev = {0, {0}};
    80. ep_ev.data.ptr = ev; //io fd结构体
    81. ep_ev.events = ev->events = events; //需要检测的fd事件
    82. int op; //操作类型
    83. if(ev->status == 1){
    84. op = EPOLL_CTL_MOD; //修改
    85. }else{
    86. op = EPOLL_CTL_ADD; //添加
    87. ev->status = 1; //标志已经添加
    88. }
    89. if(epoll_ctl(epfd, op, ev->fd, &ep_ev) <0 ){ //绑定
    90. printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
    91. return -1;
    92. }
    93. return 0;
    94. }
    95. //io fd del
    96. int nty_event_del(int epfd, struct ntyevent* ev)
    97. {
    98. struct epoll_event ep_ev = {0, {0}};
    99. if(ev->status != 1){ //没有添加过检测的fd事件
    100. return -1;
    101. }
    102. ep_ev.data.ptr = ev;
    103. ev->status = 0; //标志未添加
    104. epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    105. return 0;
    106. }
    107. //读行
    108. int readline(char* allbuf, int idx, char* linebuf)
    109. {
    110. int len = strlen(allbuf);
    111. for(; idx < len; idx++){
    112. if(allbuf[idx] == '\r' && allbuf[idx+1] == '\n')
    113. return idx+2;
    114. else
    115. *(linebuf++) = allbuf[idx];
    116. }
    117. return -1;
    118. }
    119. int base64_encode(char *in_str, int in_len, char *out_str) {
    120. BIO *b64, *bio;
    121. BUF_MEM *bptr = NULL;
    122. size_t size = 0;
    123. if (in_str == NULL || out_str == NULL)
    124. return -1;
    125. b64 = BIO_new(BIO_f_base64());
    126. bio = BIO_new(BIO_s_mem());
    127. bio = BIO_push(b64, bio);
    128. BIO_write(bio, in_str, in_len);
    129. BIO_flush(bio);
    130. BIO_get_mem_ptr(bio, &bptr);
    131. memcpy(out_str, bptr->data, bptr->length);
    132. out_str[bptr->length-1] = '\0';
    133. size = bptr->length;
    134. BIO_free_all(bio);
    135. return size;
    136. }
    137. //握手流程
    138. int ws_handshark(struct ntyevent *ev)
    139. {
    140. int idx = 0;
    141. char sec_data[128] = {0};
    142. char sec_accept[128] = {0};
    143. do{
    144. char linebuff[1024] = {0};
    145. idx = readline(ev->buffer, idx, linebuff);
    146. if(strstr(linebuff, "Sec-WebSocket-Key") > 0){
    147. //Sec-WebSocket-Key: hGI6OP19pWseAnpMcEnb2g==
    148. //Key+GUID
    149. //Sec-WebSocket-Key: hGI6OP19pWseAnpMcEnb2g==258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    150. strcat(linebuff, GUID);
    151. //哈希SHA-1
    152. SHA1(linebuff + strlen("Sec-WebSocket-Key: "), strlen(linebuff + strlen("Sec-WebSocket-Key: ")), sec_data);
    153. //base64 编码
    154. base64_encode(sec_data, strlen(sec_data), sec_accept);
    155. //printf("idx: %d, line: %ld\n",idx, sizeof("Sec-WebSocket-Key: "));
    156. //printf("idx: %d, line: %ld\n",idx, strlen("Sec-WebSocket-Key: "));
    157. printf("idx %d ; line:%s\n", idx, sec_accept);
    158. memcpy(ev->sec_accept, sec_accept, ACCEPT_KEY_LENGTH);
    159. }
    160. //printf("line %d ; line:%s\n", idx, linebuff);
    161. }while((ev->buffer[idx] != '\r' || ev->buffer[idx + 1] != '\n') && idx != -1); //两组\r\n结束
    162. return 0;
    163. }
    164. //Masking-key, if MASK set to 1 解除数据屏蔽
    165. void umask(char *payload, int length, char *mask_key) {
    166. int i = 0;
    167. for (i = 0;i < length;i ++) {
    168. payload[i] ^= mask_key[i%4];
    169. }
    170. }
    171. int ws_tranmission(struct ntyevent *ev)
    172. {
    173. struct ws_ophdr *hdr = (struct ws_ophdr *)ev->buffer;
    174. if (hdr->pl_len < 126) {
    175. unsigned char *payload = NULL;
    176. if (hdr->mask) {
    177. payload = ev->buffer + 6;
    178. umask(payload, hdr->pl_len, ev->buffer + 2);
    179. } else {
    180. payload = ev->buffer + 2;
    181. }
    182. printf("payload: %s\n", payload);
    183. } else if (hdr->pl_len == 126) {
    184. } else if (hdr->pl_len == 127) {
    185. } else {
    186. //assert(0);
    187. }
    188. return 0;
    189. }
    190. //请求 协议解析
    191. int ws_request(struct ntyevent * ev)
    192. {
    193. if (ev->wsstatus == WS_HANDSHARK) {
    194. ws_handshark(ev);
    195. ev->wsstatus = WS_TRANMISSION; //状态改变
    196. } else if (ev->wsstatus == WS_TRANMISSION) {
    197. ws_tranmission(ev);
    198. }
    199. }
    200. //应答 #服务器-->浏览器
    201. int ws_response(struct ntyevent* ev)
    202. {
    203. ev->wlength = sprintf(ev->wbuffer, "HTTP/1.1 101 Switching Protocols\r\n"
    204. "Upgrade: websocket\r\n"
    205. "Connection: Upgrade\r\n"
    206. "Sec-WebSocket-Accept: %s\r\n\r\n", ev->sec_accept);
    207. printf("response: %s\n", ev->wbuffer);
    208. return ev->wlength;
    209. }
    210. //recv回调
    211. int recv_cb(int fd, int events, void* arg)
    212. {
    213. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    214. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    215. if(ev == NULL)return -1;
    216. int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    217. nty_event_del(reactor->epfd, ev);
    218. if (len > 0) {
    219. ev->length = len;
    220. ev->buffer[len] = '\0';
    221. //printf("recv [%d]:%s\n", fd, ev->buffer);
    222. ws_request(ev);
    223. //将fd 设置为发送事件
    224. nty_event_set(ev, fd, send_cb, reactor);
    225. nty_event_add(reactor->epfd, EPOLLOUT, ev);
    226. } else if (len == 0) { //客户端断开连接
    227. nty_event_del(reactor->epfd, ev);
    228. printf("recv_cb --> disconnect\n");
    229. close(ev->fd);
    230. } else { //返回错误
    231. if (errno == EAGAIN && errno == EWOULDBLOCK) { //
    232. } else if (errno == ECONNRESET){
    233. nty_event_del(reactor->epfd, ev);
    234. close(ev->fd);
    235. }
    236. printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    237. }
    238. return len;
    239. }
    240. //send回调
    241. int send_cb(int fd, int events, void* arg)
    242. {
    243. struct ntyreactor* reactor = (struct ntyreactor*)arg;
    244. struct ntyevent* ev = ntyreactor_idx(reactor, fd);
    245. if (ev == NULL) return -1;
    246. ws_response(ev);
    247. int len = send(fd, ev->wbuffer, ev->length, 0);
    248. if (len > 0) {
    249. printf("send[fd=%d], [%d]%s\n", fd, len, ev->wbuffer);
    250. //发送后,将fd设置为接收事件
    251. nty_event_del(reactor->epfd, ev);
    252. nty_event_set(ev, fd, recv_cb, reactor);
    253. nty_event_add(reactor->epfd, EPOLLIN, ev);
    254. } else { //发送失败
    255. nty_event_del(reactor->epfd, ev);
    256. close(ev->fd);
    257. printf("send[fd=%d] error %s\n", fd, strerror(errno));
    258. }
    259. return len;
    260. }
    261. //客户端接入回调
    262. int accept_cb(int fd, int events, void* arg)
    263. {
    264. struct ntyreactor *reactor = (struct ntyreactor*)arg;
    265. if (reactor == NULL) return -1;
    266. struct sockaddr_in client_addr;
    267. socklen_t len = sizeof(client_addr);
    268. int clientfd;
    269. //客户端接入
    270. if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
    271. if (errno != EAGAIN && errno != EINTR) {
    272. }
    273. printf("accept: %s\n", strerror(errno));
    274. return -1;
    275. }
    276. //设置非阻塞fd
    277. int flag = 0;
    278. if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
    279. printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
    280. return -1;
    281. }
    282. struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
    283. if (event == NULL) return -1;
    284. //将该fd设置为recv
    285. nty_event_set(event, clientfd, recv_cb, reactor);
    286. nty_event_add(reactor->epfd, EPOLLIN, event);
    287. printf("new connect [%s:%d], pos[%d]\n",
    288. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
    289. return 0;
    290. }
    291. //创建socket监听
    292. int init_sock(short port)
    293. {
    294. int fd = socket(AF_INET, SOCK_STREAM, 0);
    295. fcntl(fd, F_SETFL, O_NONBLOCK);
    296. struct sockaddr_in server_addr;
    297. memset(&server_addr, 0, sizeof(server_addr));
    298. server_addr.sin_family = AF_INET;
    299. server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    300. server_addr.sin_port = htons(port);
    301. bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    302. if (listen(fd, 20) < 0) {
    303. printf("listen failed : %s\n", strerror(errno));
    304. return -1;
    305. }
    306. printf("listen server port : %d\n", port);
    307. return fd;
    308. }
    309. //reactor扩展大小
    310. int ntyreactor_alloc(struct ntyreactor* reactor)
    311. {
    312. if(reactor == NULL) return -1;
    313. if(reactor->evblks == NULL) return -1;
    314. struct eventblock* blk = reactor->evblks; //块的头结点
    315. //找尾节点
    316. while(blk->next != NULL){ //找到尾节点
    317. blk = blk->next;
    318. }
    319. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    320. if (evs == NULL) {
    321. printf("ntyreactor_alloc ntyevent failed\n");
    322. return -2;
    323. }
    324. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    325. struct eventblock *block = malloc(sizeof(struct eventblock));
    326. if (block == NULL) {
    327. printf("ntyreactor_alloc eventblock failed\n");
    328. return -3;
    329. }
    330. //io fd集合连接成块
    331. block->events = evs;
    332. block->next = NULL;
    333. //指向新块
    334. blk->next = block;
    335. reactor->blkcnt ++;
    336. return 0;
    337. }
    338. //根据io fd来找fd结构体
    339. struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
    340. if (reactor == NULL) return NULL;
    341. if (reactor->evblks == NULL) return NULL;
    342. int blkidx = sockfd / MAX_EPOLL_EVENTS; //在哪一个块
    343. while (blkidx >= reactor->blkcnt) { //大小不够扩容
    344. ntyreactor_alloc(reactor);
    345. }
    346. int i = 0;
    347. struct eventblock *blk = reactor->evblks; //头结点块
    348. while (i++ != blkidx && blk != NULL) { //找到所在的块
    349. blk = blk->next;
    350. }
    351. return &blk->events[sockfd % MAX_EPOLL_EVENTS]; //返回fd结构体
    352. }
    353. //reactor初始化
    354. int ntyreactor_init(struct ntyreactor* reactor)
    355. {
    356. if(reactor == NULL) return -1;
    357. memset(reactor, 0, sizeof(struct ntyreactor));
    358. reactor->epfd = epoll_create(1);
    359. if (reactor->epfd <= 0) {
    360. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    361. return -2;
    362. }
    363. //创建第一个块
    364. struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    365. if (evs == NULL) {
    366. printf("create epfd in %s err %s\n", __func__, strerror(errno));
    367. close(reactor->epfd);
    368. return -3;
    369. }
    370. memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    371. struct eventblock *block = malloc(sizeof(struct eventblock));
    372. if (block == NULL) {
    373. free(evs);
    374. close(reactor->epfd);
    375. return -3;
    376. }
    377. block->events = evs;
    378. block->next = NULL;
    379. reactor->evblks = block;
    380. reactor->blkcnt = 1;
    381. return 0;
    382. }
    383. //销毁reactor
    384. int ntyreactor_destory(struct ntyreactor* reactor)
    385. {
    386. close(reactor->epfd);
    387. struct eventblock *blk = reactor->evblks;
    388. struct eventblock *blk_next;
    389. while (blk != NULL) {
    390. blk_next = blk->next;
    391. free(blk->events);
    392. free(blk);
    393. blk = blk_next;
    394. }
    395. return 0;
    396. }
    397. //初始化接收连接socket
    398. int ntyreactor_addlistener(struct ntyreactor* reactor, int sockfd, NCALLBACK *acceptor){
    399. if (reactor == NULL) return -1;
    400. if (reactor->evblks == NULL) return -1;
    401. struct ntyevent* event = ntyreactor_idx(reactor, sockfd);
    402. if (event == NULL) return -1;
    403. nty_event_set(event, sockfd, acceptor, reactor);
    404. nty_event_add(reactor->epfd, EPOLLIN, event);
    405. return 0;
    406. }
    407. //reactor事件循环
    408. int ntyreactor_run(struct ntyreactor* reactor)
    409. {
    410. if (reactor == NULL) return -1;
    411. if (reactor->epfd < 0) return -1;
    412. if (reactor->evblks == NULL) return -1;
    413. struct epoll_event events[MAX_EPOLL_EVENTS+1];
    414. int checkpos = 0, i;
    415. while(1){
    416. int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
    417. if (nready < 0) {
    418. printf("epoll_wait error, exit\n");
    419. continue;
    420. }
    421. for(i = 0;i < nready; i++){
    422. struct ntyevent* ev = (struct ntyevent*)events[i].data.ptr; //发生事件的io fd结构体
    423. if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)){
    424. ev->callback(ev->fd, events[i].events, ev->arg);
    425. }
    426. if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)){
    427. ev->callback(ev->fd, events[i].events, ev->arg);
    428. }
    429. }
    430. }
    431. }
    432. int main(int argc, char *argv[]) {
    433. struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    434. ntyreactor_init(reactor);
    435. //起始的端口号
    436. unsigned short port = SERVER_PORT;
    437. if (argc == 2) {
    438. port = atoi(argv[1]);
    439. }
    440. int i = 0;
    441. int sockfds[PORT_COUNT] = {0};
    442. for (i = 0;i < PORT_COUNT;i ++) {
    443. sockfds[i] = init_sock(port+i);
    444. ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    445. }
    446. ntyreactor_run(reactor);
    447. ntyreactor_destory(reactor);
    448. for (i = 0;i < PORT_COUNT;i ++) {
    449. close(sockfds[i]);
    450. }
    451. free(reactor);
    452. return 0;
    453. }

    4.运行结果

    总结:

     1. websocket是全双工方式,建立连接后客户端与服务器端是完全平等的,可以互相主动请求。而HTTP长连接基于HTTP,是传统的客户端对服务器发起请求的模式。

    2. HTTP长连接中,每次数据交换除了真正的数据部分外,服务器和客户端还要大量交换HTTP header,信息交换效率很低。Websocket协议通过第一个request建立了TCP连接之后,之后交换的数据都不需要发送 HTTP header就能交换数据。

    推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: 

  • 相关阅读:
    PAT 甲级 A1030 Travel Plan
    java基于微信小程序的点餐外卖系统 uniapp 小程序
    web前端学习笔记10
    Redis 学习-上
    Day10—Spark SQL基础
    5V*0.5A低压降二极管芯片 CH213
    深度学习(PyTorch)——Dataset&DataLoader 加载数据集
    C++中实现雪花算法来在秒级以及毫秒及时间内生成唯一id
    评估驾驶员头部姿态变化幅度的统计方法
    使用 Pygame 构建和可视化数独游戏
  • 原文地址:https://blog.csdn.net/kakaka666/article/details/126318776