• Netty(3)网络编程


    阻塞

    演示

    代码演示如下:

    1. Client(客户端代码)
      SocketChannel sc = SocketChannel.open();
      sc.connect(new InetSocketAddress("localhost", 8080));
      System.out.println("waiting...");
      
      • 1
      • 2
      • 3
    2. Server(服务器端代码)
      // 使用 nio 来理解阻塞模式, 单线程
      // 0. ByteBuffer
      ByteBuffer buffer = ByteBuffer.allocate(16);
      // 1. 创建了服务器
      ServerSocketChannel ssc = ServerSocketChannel.open();
      
      // 2. 绑定监听端口
      ssc.bind(new InetSocketAddress(8080));
      
      // 3. 连接集合
      List<SocketChannel> channels = new ArrayList<>();
      while (true) {
          // 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
          log.debug("connecting...");//开始建立连接
          //客户端没有发起请求,所以如果客户端不启动,便会阻塞到下面这行代码
          SocketChannel sc = ssc.accept(); // 阻塞方法(线程停止运行)
          log.debug("connected... {}", sc);//连接建立
          channels.add(sc);
          for (SocketChannel channel : channels) {
              // 5. 接收客户端发送的数据
              log.debug("before read... {}", channel);
              //如果客户端没有像服务器发送数据,便会阻塞到下面这行代码
              channel.read(buffer); // 阻塞方法,线程停止运行
              buffer.flip(); 
              debugRead(buffer);
              buffer.clear();
              log.debug("after read...{}", channel);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30

    说明:

    1. 当Server先运行时,代码会阻塞到accept方法处
    2. 当Client运行时,Server代码会阻塞到read方法处,因为客户端没有发送数据到服务器端(Server)
    3. 当Client里面包含数据后(直接sout的不是想服务器端发送的数据,需要使用(sc.write(Charset.defaultCharset().encode("hello"))),Server将会继续执行(并将hello打印出来),这种可以使用调试工具完成(右键左框,然后选择右框,过后出现最后的图片内容,输入对应的数据调试即可)()
      在这里插入图片描述
    4. 这种情况只能使用一次(虽然是死循环,Server并不会有任何的输出),因为线程阻塞到accept方法这个位置,需要开启一个新的连接才可以

    总结

    上面的代码总结了下面的内容:
    阻塞模式下,相关方法都会导致线程暂停

    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置

    单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

    非阻塞

    演示

    代码与之前一样,客户端代码一样的,服务器端的代码有一些变化,加了下面一行代码
    xxx.configureBlocking(false); // 非阻塞模式

    1. Server
      // 使用 nio 来理解非阻塞模式, 单线程
      // 0. ByteBuffer
      ByteBuffer buffer = ByteBuffer.allocate(16);
      // 1. 创建了服务器
      ServerSocketChannel ssc = ServerSocketChannel.open();
      ssc.configureBlocking(false); // 非阻塞模式
      // 2. 绑定监听端口
      ssc.bind(new InetSocketAddress(8080));
      // 3. 连接集合
      List<SocketChannel> channels = new ArrayList<>();
      while (true) {
          // 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
          SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
          if (sc != null) {
              log.debug("connected... {}", sc);
              sc.configureBlocking(false); // 非阻塞模式
              channels.add(sc);
          }
          for (SocketChannel channel : channels) {
              // 5. 接收客户端发送的数据
              int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
              if (read > 0) {
                  buffer.flip();
                  debugRead(buffer);
                  buffer.clear();
                  log.debug("after read...{}", channel);
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30

    总结

    非阻塞模式下,相关方法都会不会让线程暂停

    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去

    问题:非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu

    多路复用()

    多路复用:单线程配合 Selector 完成对多个 Channel 可读写事件的监控

    1. 多路复用仅针对网络 IO,普通文件的IO是无法利用多路复用的
    2. Selector 能够保证:
      • 有可连接事件时才去连接
      • 有可读事件才去读取
      • 有可写事件才去写入(限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件)

    Selector

    selector 版
    selector
    thread
    channel
    channel
    channel

    使用的优势:

    • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
    • 让这个线程能够被充分利用
    • 节约了线程的数量
    • 减少了线程上下文切换

    概述

    有四种事件类型:

    1. accept :会在有连接请求时触发
    2. connect :是客户端,连接建立后触发
    3. read :可读事件
    4. write :可写事件

    创建Selector

    创建 selector, 管理多个 channel

    Selector selector = Selector.open();
    
    
    • 1
    • 2

    绑定 Channel 事件(又称注册事件)

    只有绑定了对应的事件,selector才会关系,

    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector, 绑定事件);
    
    
    • 1
    • 2
    • 3

    总结:

    • channel 必须工作在非阻塞模式
    • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
    • 绑定的事件类型可以有
      1. accept :会在有连接请求时触发,服务器端成功接受连接时触发
      2. connect :是客户端,连接建立后触发; 客户端连接成功时触发
      3. read :可读事件,数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
      4. write :可写事件,数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

    Select(监听 Channel 事件)

    可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

    阻塞直到绑定事件发生

    int count = selector.select();
    
    • 1

    阻塞直到绑定事件发生(或超时)

    阻塞直到绑定事件发生,或是超时(时间单位为 ms)

    int count = selector.select(long timeout);
    
    • 1

    根据返回值检查是否有事件

    不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

    int count = selector.selectNow();
    
    • 1

    select 如何不阻塞

    事件发生时

    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时

    调用 selector.wakeup()

    这个可以提前说明,这个是一次性的,即只要说明了一次wakeup()方法,那么selector不管在该方法前面后后面执行都是不阻塞的,但是每一个wakeup方法只能被调用过一次,调用过后,便会失效

    调用 selector.close()

    selector 所在线程 interrupt

    处理 accept 事件

    客户端代码与之前的类似,使用debug工具去发送数据

    服务端代码:

    try (ServerSocketChannel channel = ServerSocketChannel.open()) {
       channel.bind(new InetSocketAddress(8080));
       System.out.println(channel);
       Selector selector = Selector.open();
       channel.configureBlocking(false);
       channel.register(selector, SelectionKey.OP_ACCEPT);
    
       while (true) {
           int count = selector.select();
           log.debug("select count: {}", count);
           // 获取所有事件
           Set<SelectionKey> keys = selector.selectedKeys();
           // 遍历所有事件,逐一处理
           Iterator<SelectionKey> iter = keys.iterator();
           while (iter.hasNext()) {
               SelectionKey key = iter.next();
               // 判断事件类型
               if (key.isAcceptable()) {
                   ServerSocketChannel c = (ServerSocketChannel) key.channel();
                   // 必须处理
                   SocketChannel sc = c.accept();
                   log.debug("{}", sc);
               }
               // 处理完毕,必须将事件移除
               iter.remove();
           }
       }
    } catch (IOException e) {
       e.printStackTrace();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    处理 read 事件

    // 1. 创建 selector, 管理多个 channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的联系(注册)
    // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);//0表示不关注事件
    // key 只关注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);//确定其只关注哪一个事件(accept取值是16)
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    //确定事件什么时候发生
    while (true) {
        // 3. select 方法, 没有事件发生,线程阻塞;有事件,线程才会恢复运行
        // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
        selector.select();
        // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
        // 使用迭代器遍历:selector.selectedKeys()
        // set集合,因为要做删除操作,所有需要使用迭代器遍历(如果集合想在遍历时删除就需要使用迭代器),不使用增强for
        // 获取的事件可能是accept也可能是read等
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
        while (iter.hasNext()) {
           SelectionKey key = iter.next();
           // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
           iter.remove();
           log.debug("key: {}", key);
           // 5. 区分事件类型
           if (key.isAcceptable()) { // 如果是 accept
               ServerSocketChannel channel = (ServerSocketChannel) key.channel();
               SocketChannel sc = channel.accept();
               sc.configureBlocking(false);
               //将sc 这个channel交给selector管理
               SelectionKey scKey = sc.register(selector, 0, null);
               //关注读事件
               scKey.interestOps(SelectionKey.OP_READ);
               log.debug("{}", sc);
               log.debug("scKey:{}", scKey);
           } else if (key.isReadable()) { // 如果是 read
               try {
                   SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                   ByteBuffer buffer = ByteBuffer.allocate(4);
                   int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                   if(read == -1) {
                       key.cancel();
                   } else {
                       buffer.flip();
                       System.out.println(Charset.defaultCharset().decode(buffer));
                   }
               } catch (IOException e) {
                   e.printStackTrace();
                   key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
               }
           }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    为什么要 iter.remove()

    因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除

    1. 当ssc.register()将其进行注册时(SelectionKey sscKey = ssc.register(selector, 0, null)),就会将sscKey放到selector 集合中。
    2. 当运行到selector.select()向下运行时,便会创建一个新的集合selectedKeys中(Iterator iter = selector.selectedKeys().iterator()),selector会在发生事件后,向绿色集合中加入key,但不会删除
    3. 继续向下运行,开始建立连接(SocketChannel sc = channel.accept()),这时候便会将key上的事件去掉,表示已经处理过了,但是key还留在集合里面
    4. 代码继续向下运行,执行到( SocketChannel sc = channel.accept()),将ServerSocketChannel注册到selector中,这时会往selector 中加入一个新的key
      在这里插入图片描述
    5. 现在开始下一轮循环,会将之前添加的ServerSocketChannel注册的key添加到selectedKeys集合中
    6. 现在进入循环,这时候拿的是上一轮循环留下的key
      在这里插入图片描述
    7. 进入if分支,现在获取的sc就是null

    所以需要使用迭代器进行循环

    例如

    • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
    • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

    处理客户端断开(cancel)

    运行服务器,打开客户端,客户端连接上服务器后,将客户端强制关掉,这时候服务器会报错(在read的地方出现IOException)
    在这里插入图片描述
    处理方法如下,将read的地方进行try-catch,catch到异常后,将key取消(key.cancel())

    正常断开的也是会出现一些问题,所以在try里面也需要进行cancel

    代码如下:

    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
    while (iter.hasNext()) {
    	SelectionKey key = iter.next();
    	// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
    	iter.remove();
    	log.debug("key: {}", key);
    	// 5. 区分事件类型
    	if (key.isAcceptable()) { // 如果是 accept
    	    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    	    SocketChannel sc = channel.accept();
    	    sc.configureBlocking(false);
    	
    	    SelectionKey scKey = sc.register(selector, 0, null);
    	    scKey.interestOps(SelectionKey.OP_READ);
    	} else if (key.isReadable()) { // 如果是 read
    	    try {
    	        SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
    	        ByteBuffer buffer = ByteBuffer.allocate(4);
    	        int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
    	        if(read == -1) {
    	            key.cancel();
    	        } else {
    	            buffer.flip();
    	            System.out.println(Charset.defaultCharset().decode(buffer));
    	        }
    	    } catch (IOException e) {
    	        e.printStackTrace();
    	        key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
    	    }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

    // 因为客户端断开了,因此需要将 key 取消
    //(从 selector 的 keys 集合中真正删除 key)
    key.cancel();  
    
    • 1
    • 2
    • 3

    如果不对事件进行处理的话:事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件(上一次的事件,一直循环未处理的事件)仍会触发,这是因为 nio 底层使用的是水平触发

    处理消息的边界

    消息有三种情况,如下所示
    在这里插入图片描述
    出现下面几种情况:

    1. 时刻1:消息长度比ByteBuffer还要长,这种情况下ByteBuffer只能扩容,大小翻倍
    2. 时刻2:ByteBuffer相对较大,消息比较小,可能会产生半包的现象
    3. 时刻3:当消息比时刻2的还要小,一个ByteBuffer可以接受两个消息,造成粘包的情况

    半包黏包现象解决方法

    思路1:客户端和服务器端约定一个固定长度

    • 如:以可能传送的消息的最大长度为客户端和服务器之前约定的ByteBuffer的容量,客户端传送时以这个容量大小传(如果客户端内容不够需要补齐,总之使这个传送的长度是这个容量大小);服务器以一个固定大小的ByteBuffer来读
    • 简单,但是容易造成空间的浪费,浪费网络带宽(客户端补齐的数据是不必要的);实际用的很少;如果消息长度都是一模一样的,可以使用这种方法

    思路2:使用一个分隔符,去分隔消息。

    • 客户端将传送的数据中的每一条消息使用分隔符进行传送,服务器端读取数据时,需要判断有没有分隔符,有分隔符时,便证明这是一条完整的消息了,根据分隔符的位置,创建一个新的ByteBuffer去接收消息,依次循环接收;中间需要一个临时的ByteBuffer去存
    • 效率不高,临时的ByteBuffer有可能容量不够大。也不常用

    思路3:将消息分为两个部分:第一个部分存储内容长度,根据这个长度去分配ByteBuffer的大小

    • 常用,http2.0也是这样使用的

    总结:

    1. 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
    2. 另一种思路是按分隔符拆分,缺点是效率低
    3. TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
      • Http 1.1 是 TLV 格式
      • Http 2.0 是 LTV 格式

    分隔符解决

    客户端:

    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    SocketAddress address = sc.getLocalAddress();
    sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
    
    sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
    sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
    System.in.read();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    服务器端:

    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            // 找到一条完整消息
            if (source.get(i) == '\n') {
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从 source 读,向 target 写
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact(); // 0123456789abcdef  position 16 limit 16
    }
    
    public static void main(String[] args) throws IOException {
        // 1. 创建 selector, 管理多个 channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        // 2. 建立 selector 和 channel 的联系(注册)
        // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // key 只关注 accept 事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("sscKey:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
            // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
            selector.select();
            // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
                iter.remove();
                log.debug("key: {}", key);
                // 5. 区分事件类型
                if (key.isAcceptable()) { // 如果是 accept
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                    // 将一个 byteBuffer 作为附件关联到 selectionKey 上
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                    log.debug("scKey:{}", scKey);
                } else if (key.isReadable()) { // 如果是 read
                    try {
                        SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                        // 获取 selectionKey 上关联的附件
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                        if(read == -1) {
                            key.cancel();
                        } else {
                            split(buffer);
                            // 需要扩容
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer); // 0123456789abcdef3333\n
                                key.attach(newBuffer);
                            }
                        }
    
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    自动扩容

    使用自动扩容去解决这种问题

    客户端1 服务器 ByteBuffer1 ByteBuffer2 发送 01234567890abcdef3333\r 第一次 read 存入 01234567890abcdef 扩容 拷贝 01234567890abcdef 第二次 read 存入 3333\r 01234567890abcdef3333\r 客户端1 服务器 ByteBuffer1 ByteBuffer2

    服务器端代码写法如下:

    1. 在这里使用了附件,使一个ByteBuffer和一个channel关联到一个selectionKey上面,使这个与channel一样,与key

      ServerSocketChannel channel = (ServerSocketChannel) key.channel();
      SocketChannel sc = channel.accept();
      ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
      // 将一个 byteBuffer 作为附件(attachment)关联到 selectionKey 上
      SelectionKey scKey = sc.register(selector, 0, buffer);
      // 获取 selectionKey 上关联的附件
      ByteBuffer buffer = (ByteBuffer) key.attachment();
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      1. 判断是否是分隔符
      private static void split(ByteBuffer source) {
          source.flip();
          for (int i = 0; i < source.limit(); i++) {
              // 找到一条完整消息
              if (source.get(i) == '\n') {
                  int length = i + 1 - source.position();
                  // 把这条完整消息存入新的 ByteBuffer
                  ByteBuffer target = ByteBuffer.allocate(length);
                  // 从 source 读,向 target 写
                  for (int j = 0; j < length; j++) {
                      target.put(source.get());
                  }
                  debugAll(target);
              }
          }
          source.compact(); // 0123456789abcdef  position 16 limit 16
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    2. 完整代码:

      public static void main(String[] args) throws IOException {
          // 1. 创建 selector, 管理多个 channel
          Selector selector = Selector.open();
          ServerSocketChannel ssc = ServerSocketChannel.open();
          ssc.configureBlocking(false);
          // 2. 建立 selector 和 channel 的联系(注册)
          // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
          SelectionKey sscKey = ssc.register(selector, 0, null);
          // key 只关注 accept 事件
          sscKey.interestOps(SelectionKey.OP_ACCEPT);
          log.debug("sscKey:{}", sscKey);
          ssc.bind(new InetSocketAddress(8080));
          while (true) {
              // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
              // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
              selector.select();
              // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
              Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
              while (iter.hasNext()) {
                  SelectionKey key = iter.next();
                  // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
                  iter.remove();
                  log.debug("key: {}", key);
                  // 5. 区分事件类型
                  if (key.isAcceptable()) { // 如果是 accept
                      ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                      SocketChannel sc = channel.accept();
                      sc.configureBlocking(false);
                      ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                      // 将一个 byteBuffer 作为附件(attachment)关联到 selectionKey 上
                      SelectionKey scKey = sc.register(selector, 0, buffer);
                      scKey.interestOps(SelectionKey.OP_READ);
                      log.debug("{}", sc);
                      log.debug("scKey:{}", scKey);
                  } else if (key.isReadable()) { // 如果是 read
                      try {
                          SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                          // 获取 selectionKey 上关联的附件
                          ByteBuffer buffer = (ByteBuffer) key.attachment();
                          int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                          if(read == -1) {
                              key.cancel();
                          } else {
                              split(buffer);
                              // 需要扩容
                              if (buffer.position() == buffer.limit()) {
                                  ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                  buffer.flip();
                                  newBuffer.put(buffer); // 0123456789abcdef3333\n
                                  key.attach(newBuffer);
                              }
                          }
      
                      } catch (IOException e) {
                          e.printStackTrace();
                          key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                      }
                  }
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62

    ByteBuffer 大小分配

    ByteBuffer 怎样分配才合理

    每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

    ByteBuffer 不能太大,所以设计大小可变的 ByteBuffer是非常必要的,主要有下面;两种思路:

    1. 首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能(上面的自动扩容便是使用这种方法)

    2. 用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

    
    
    
    • 1
    • 2

    处理 write事件

    处理可写事件

    服务端代码:

    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(`在这里插入代码片`false);//非阻塞模式
    Selector selector = Selector.open();
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
    	 selector.select();
    	 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    	 while (iter.hasNext()) {
    	     SelectionKey key = iter.next();
    	     iter.remove();
    	     if (key.isAcceptable()) {
    	     	// ServerSocketChannel 只有一个,所以可以这样获取
    	         SocketChannel sc = ssc.accept();
    	         sc.configureBlocking(false);//非阻塞模式
    	         // 1. 向客户端发送大量数据
    	         StringBuilder sb = new StringBuilder();
    	         for (int i = 0; i < 5000000; i++) {
    	             sb.append("a");
    	         }
    	         //将sb数据放入ByteBuffer中
    	         ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    	         // buffer里面是否还有数据,内容未发完会一直循环这一段代码,导致效率低
    	       	 while(buffer.hasRemaining()){
    	         	// 2. 返回值代表实际写入的字节数,write方法不能保证所有的数据都一次性写入客户端里面
    	         	int write = sc.write(buffer);
    	         	System.out.println(write);
    	         }
    	     } 
    	 }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    客户端代码

    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    
    // 3. 接收数据
    int count = 0;
    while (true) {
        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
        count += sc.read(buffer);
        System.out.println(count);
        buffer.clear();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    一次写不完

    非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

    用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件

    服务端:

    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(`在这里插入代码片`false);//非阻塞模式
    Selector selector = Selector.open();
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
    	selector.select();
    	Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    	while (iter.hasNext()) {
    	    SelectionKey key = iter.next();
    	    iter.remove();
    	    if (key.isAcceptable()) {
    	        SocketChannel sc = ssc.accept();
    	        sc.configureBlocking(false);
    	        SelectionKey sckey = sc.register(selector, 0, null);
    	        sckey.interestOps(SelectionKey.OP_READ);
    	        // 1. 向客户端发送大量数据
    	        StringBuilder sb = new StringBuilder();
    	        for (int i = 0; i < 5000000; i++) {
    	            sb.append("a");
    	        }
    	        ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    	
    	        // 2. 返回值代表实际写入的字节数
    	        int write = sc.write(buffer);
    	        System.out.println(write);
    	
    	        // 3. 判断是否有剩余内容
    	        if (buffer.hasRemaining()) {
    	            // 4. 关注可写事件   1                     4
    	            //sckey.interestOps()之前关注的事件,避免覆盖之前的关注的事件
    	            sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
    			 //或者是这种写法:sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
    	            // 5. 把未写完的数据挂到 sckey 上
    	            sckey.attach(buffer);
    	        }
    	    } else if (key.isWritable()) {//如果是可写事件
    	        ByteBuffer buffer = (ByteBuffer) key.attachment();
    	        SocketChannel sc = (SocketChannel) key.channel();
    	        //继续写之前剩下的内容(如果一直没写完,便会一直进入else if这个方法里面)
    	        int write = sc.write(buffer);
    	        System.out.println(write);
    	        // 6. 清理操作
    	        if (!buffer.hasRemaining()) {//看buffer还有没有可写的内容
    	            key.attach(null); // 需要清除buffer
    	            key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);//不需关注可写事件,当前这个写完了,便不需要再关注可写事件了
    	        }
    	    }
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    客户端代码

    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    
    // 3. 接收数据
    int count = 0;
    while (true) {
        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
        count += sc.read(buffer);
        System.out.println(count);
        buffer.clear();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    利用多线程优化

    前面的代码只有一个选择器,可以利用多核 cpu,分两组选择器

    • 单线程配一个选择器,专门处理 accept 事件
    • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

    代码示例:

    1. 创建一个线程boss,专门处理accept事件
    2. 创建一个worker,读写交给worker

    Boss

    关联worker

    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        // 1. 创建固定数量的 worker 并初始化
        Worker workers = new Worker("worker-0");
        while(true) {
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    // 2. 关联 selector
                    sc.register(sc);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    关联work数组

    public static void main(String[] args) throws IOException {
    	Thread.currentThread().setName("boss");
    	ServerSocketChannel ssc = ServerSocketChannel.open();
    	ssc.configureBlocking(false);
    	Selector boss = Selector.open();
    	SelectionKey bossKey = ssc.register(boss, 0, null);
    	bossKey.interestOps(SelectionKey.OP_ACCEPT);
    	ssc.bind(new InetSocketAddress(8080));
    	// 1. 创建固定数量(核心线程数)的 worker 并初始化
    	Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
    	for (int i = 0; i < workers.length; i++) {
    	    workers[i] = new Worker("worker-" + i);
    	}
    	//计数器
    	AtomicInteger index = new AtomicInteger();
    	while(true) {
    	    boss.select();
    	    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
    	    while (iter.hasNext()) {
    	        SelectionKey key = iter.next();
    	        iter.remove();
    	        if (key.isAcceptable()) {
    	            SocketChannel sc = ssc.accept();
    	            sc.configureBlocking(false);
    	            // 2. 关联 selector
    	            // round robin 轮询
    	            workers[index.getAndIncrement() % workers.length].register(sc); // boss 调用 初始化 selector , 启动 worker-0
    	        }
    	    }
    	}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    拿到 cpu 个数

    Runtime.getRuntime().availableProcessors()在真实的机器上使用是没有问题的,但是现在流行将项目部署在docker容器上,所以这个拿到的是容器所在的物理机上的核心线程数

    Runtime.getRuntime().availableProcessors()如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数

    这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

    所以还是手工去指定一下最好

    worker

    这种有两种实现方式,如下所示:

    1. 第一种:

      static class Worker implements Runnable{
      	private Thread thread;
      	private Selector selector;
      	private String name;
      	private volatile boolean start = false; // 还未初始化
      	// 线程安全的队列
      	private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
      	public Worker(String name) {
      	    this.name = name;
      	}
      	
      	// 初始化线程,和 selector
      	public void register(SocketChannel sc) throws IOException {
      		//如果是第一次,则进行初始化
      	    if(!start) {
      	        selector = Selector.open();
      	        thread = new Thread(this, name);
      	        thread.start();
      	        start = true;
      	    }
      	    //向队列添加任务,但是这个任务没有立即执行boss
      	    queue.add(()->{
      	    	try {
      	    		sc.register(selector, SelectionKey.OP_READ, null);// boss
      	    	}catch (ClosedChannerlException e){
      	    		e.printStackTrace();
      	    	}
      	    });
      	    selector.wakeup(); // 唤醒 select 方法 boss
      	}
      	
      	@Override
      	public void run() {
      	    while(true) {
      	        try {
      	            selector.select(); // worker-0  阻塞
      	            Runnable task = queue.poll();
      	            if(task != null){
      	            	task.run();//其实就是执行了sc.register(selector, SelectionKey.OP_READ, null);
      	            }
      	            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      	            while (iter.hasNext()) {
      	                SelectionKey key = iter.next();
      	                iter.remove();
      	                if (key.isReadable()) {//可读
      	                    ByteBuffer buffer = ByteBuffer.allocate(16);
      	                    SocketChannel channel = (SocketChannel) key.channel();
      	                    channel.read(buffer);
      	                }
      	            }
      	        } catch (IOException e) {
      	            e.printStackTrace();
      	        }
      	    }
      	}
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
    2. 第二种

      static class Worker implements Runnable{
      	private Thread thread;
      	private Selector selector;
      	private String name;
      	private volatile boolean start = false; // 还未初始化
      	private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
      	public Worker(String name) {
      	    this.name = name;
      	}
      	
      	// 初始化线程,和 selector
      	public void register(SocketChannel sc) throws IOException {
      	    if(!start) {
      	        selector = Selector.open();
      	        thread = new Thread(this, name);
      	        thread.start();
      	        start = true;
      	    }
      	    selector.wakeup(); // 唤醒 select 方法 boss
      	    sc.register(selector, SelectionKey.OP_READ, null); // boss
      	}
      	
      	@Override
      	public void run() {
      	    while(true) {
      	        try {
      	            selector.select(); // worker-0  阻塞
      	            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      	            while (iter.hasNext()) {
      	                SelectionKey key = iter.next();
      	                iter.remove();
      	                if (key.isReadable()) {
      	                    ByteBuffer buffer = ByteBuffer.allocate(16);
      	                    SocketChannel channel = (SocketChannel) key.channel();
      	                    log.debug("read...{}", channel.getRemoteAddress());
      	                    channel.read(buffer);
      	                    buffer.flip();
      	                    debugAll(buffer);
      	                }
      	            }
      	        } catch (IOException e) {
      	            e.printStackTrace();
      	        }
      	    }
      	}
      	}
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47

    UDP

    UDP 是无连接的,client 发送数据不会管 server 是否开启

    server 这边的 receive 方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

  • 相关阅读:
    学习负载均衡的算法
    网络安全深入学习第二课——热门框架漏洞(RCE—Thinkphp5.0.23 代码执行)
    django4使用富文本编辑器ckeditor浏览器显示html标签的问题
    QT键盘事件_获取CTRL-SHIFT-回车键ctrl+M组合键
    在ubuntu18.04上安装pangolin
    DeblurGAN:图像去模糊复现
    如何用神经网络预测数据,神经网络预测的软件有
    穿仓与均摊
    01_初识微服务
    接口自动化测试方案
  • 原文地址:https://blog.csdn.net/yyuggjggg/article/details/126168775