• WebSocket --- ws模块源码解析(详解)


    摘要

    在这一篇文章中,写了如何在node端和web端,实现一个WebSocket通信。
    WebSocket在node端和客户端的使用

    而在node端里面,我们使用了ws模块来创建WebSocket和WebSocketServer,那ws模块是如何做到可以和客户端进行双向通信的呢?

    426状态码

    在HTTP中,426表示“Upgrade Required”,即客户端需要通过HTTP协议的升级版进行访问。这个状态码主要用在WebSockets协议中,表示客户端需要使用WebSockets协议来连接服务器。

    什么意思呢?例如我们创建一个HTTP服务如果这么写:

    const http = require('http')
    
    const server = http.createServer((req, res) => {
      const body = http.STATUS_CODES[426];
      res.writeHead('426', {
        'Content-Type': 'text/align',
        'Content-Length': body.length
      })
      res.end(body)
    })
    
    server.listen(8080)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    就是告诉客户端,如果你访问我这边的服务,那么你就要进行升级服务。也就是使用WebSocket对我进行访问!

    那有一个问题,如果客户端使用了WebSocket访问,服务端要怎么进行响应呢?

    还直接在createServer里面的回调中处理吗?

    upgrade事件

    在这里面,如果客户端通过WebSocket进行访问服务端,会触发服务端server的upgrade事件,也就是说会进下面的回调函数里。

    server.on('upgrade',(req, socket, head) => {
      // 固定格式
      const key = req.headers['sec-websocket-key'];
      const digest = createHash('sha1')
      .update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',)
      .digest('base64');
    
      const headers = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        `Sec-WebSocket-Accept: ${digest}`
      ];
      socket.write(headers.concat('\r\n').join('\r\n'));
    
      // 客户端发送的消息
      socket.on('data', (data) => {
        console.log(data.toString());
      })
     
      // 服务端向客户端发送消息
      socket.write('你好')
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    这个回调中,通过socket来进行和客户端进行双向通信。

    转码

    但是只有上面的例子,似乎每次拿到的数据都是乱码。这是因为WebSocket之间的通信的报文,不能通过Buffer的toString直接转码。这里提供一下在网上找到的转码方法:

    server.on('upgrade', (req, socket, head) => {
      const key = req.headers['sec-websocket-key'];
      const digest = createHash('sha1')
      .update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',)
      .digest('base64');
    
      const headers = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        `Sec-WebSocket-Accept: ${digest}`
      ];
      socket.write(headers.concat('\r\n').join('\r\n'));
      socket.on('data',(data) => {
        console.log(decodeSocketFrame(data).payloadBuf.toString())
        socket.write(encodeSocketFrame({
          fin:1,
          opcode:1,
          payloadBuf:Buffer.from('你好')
      }))
      })
    })
    
    function decodeSocketFrame (bufData){
      let bufIndex = 0
      const byte1 = bufData.readUInt8(bufIndex++).toString(2)
      const byte2 = bufData.readUInt8(bufIndex++).toString(2)
      console.log(byte1);
      console.log(byte2);
      const frame =  {
          fin:parseInt(byte1.substring(0,1),2),
          // RSV是保留字段,暂时不计算
          opcode:parseInt(byte1.substring(4,8),2),
          mask:parseInt(byte2.substring(0,1),2),
          payloadLen:parseInt(byte2.substring(1,8),2),
      }
      // 如果frame.payloadLen为126或127说明这个长度不够了,要使用扩展长度了
      // 如果frame.payloadLen为126,则使用Extended payload length同时为16/8字节数
      // 如果frame.payloadLen为127,则使用Extended payload length同时为64/8字节数
      // 注意payloadLen得长度单位是字节(bytes)而不是比特(bit)
      if(frame.payloadLen==126) {
          frame.payloadLen = bufData.readUIntBE(bufIndex,2);
          bufIndex+=2;
      } else if(frame.payloadLen==127) {
          // 虽然是8字节,但是前四字节目前留空,因为int型是4字节不留空int会溢出
          bufIndex+=4;
          frame.payloadLen = bufData.readUIntBE(bufIndex,4);
          bufIndex+=4;
      }
      if(frame.mask){
          const payloadBufList = []
          // maskingKey为4字节数据
          frame.maskingKey=[bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++]];
          for(let i=0;i<frame.payloadLen;i++) {
              payloadBufList.push(bufData[bufIndex+i]^frame.maskingKey[i%4]);
          }
          frame.payloadBuf = Buffer.from(payloadBufList)
      } else {
          frame.payloadBuf = bufData.slice(bufIndex,bufIndex+frame.payloadLen)
      }
      return frame
    }
    
    function encodeSocketFrame (frame){
      const frameBufList = [];
      // 对fin位移七位则为10000000加opcode为10000001
      const header = (frame.fin<<7)+frame.opcode;
      frameBufList.push(header)
      const bufBits = Buffer.byteLength(frame.payloadBuf);
      let payloadLen = bufBits;
      let extBuf;
      if(bufBits>=126) {
          //65536是2**16即两字节数字极限
          if(bufBits>=65536) {
              extBuf = Buffer.allocUnsafe(8);
              buf.writeUInt32BE(bufBits, 4);
              payloadLen = 127;
          } else {
              extBuf = Buffer.allocUnsafe(2);
              buf.writeUInt16BE(bufBits, 0);
              payloadLen = 126;
          }
      }
      let payloadLenBinStr = payloadLen.toString(2);
      while(payloadLenBinStr.length<8){payloadLenBinStr='0'+payloadLenBinStr;}
      frameBufList.push(parseInt(payloadLenBinStr,2));
      if(bufBits>=126) {
          frameBufList.push(extBuf);
      }
      frameBufList.push(...frame.payloadBuf)
      return Buffer.from(frameBufList)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    WebSocketServer的实现

    有了上面的基础,基本知道双向通信是怎么做到的了。就来看一下WebSocketServer的实现。
    当我们使用的时候,我们是以这种方式:

    const WebSocketServer = require('ws')
    
    const wss = new WebSocketServer('8080');
    wss.on('connection', (ws) => {
      
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    我们知道,connection是httpServer的回调,为什么在WebSocketServer中可以使用呢?

    export default class WebSocketServer {
      constructor(port) {
        this._server = http.createServer((req, res) => {
          const body = http.STATUS_CODES[426];
          res.writeHead('426', {
            'Content-Type': 'text/align',
            'Content-Length': body.length
          })
          res.end(body)
        })
    
        this._server.listen(port);
    
        const connectionEmit = this.emit.bind(this, 'connection');
        const closeEmit = this.emit.bind(this, 'close');
        // 其他事件,都是http能监听到的;
        const map = {
          connection: connectionEmit,
          close: closeEmit
        }
    
        for(let emitName in map) {
          this._server.on(emitName, map[emitName])
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在WebSocketServer中,如果客户端触发了http的事件时,它便将其转发到WebSocket实例上面。
    然后再处理自己的逻辑。

  • 相关阅读:
    java面试系列(4)——JVM共享区
    JS深入理解立即执行函数,js匿名函数()
    【Linux基础】Linux发展史
    腾讯云轻量应用服务器搭建跨境电商的方法步骤(非常详细)
    受害者被锤 法官遭殃 背后的它公关赢了?
    财报中连创佳绩,饿了么做对了什么?
    HttpUtils带连接池
    An动画基础之元件的影片剪辑效果
    LLM 01-引言
    Postman使用
  • 原文地址:https://blog.csdn.net/weixin_46726346/article/details/134501070