• Netty——网络编程 NIO(Selector处理write事件 写入内容过多问题)代码示例


    一、写入内容过多问题概述

    • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)。
    • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略。
      (1)、当消息处理器第一次写入消息时,才将 channel 注册到 selector 上;
      (2)、selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册,如果不取消,会每次可写均会触发 write 事件

    二、写入内容过多问题的代码示例

    2.1、服务端代码示例

    • 服务端代码

      package com.example.nettytest.nio.day3;
      
      import java.io.IOException;
      import java.net.InetSocketAddress;
      import java.nio.ByteBuffer;
      import java.nio.channels.SelectionKey;
      import java.nio.channels.Selector;
      import java.nio.channels.ServerSocketChannel;
      import java.nio.channels.SocketChannel;
      import java.nio.charset.Charset;
      import java.util.Iterator;
      
      /**
       * @description: NIO(Selector处理write事件 写入内容过多问题)代码示例
       * @author: xz
       * @create: 2022-09-18
       */
      public class Test6Server {
          public static void main(String[] args) throws IOException {
              writeServer1();
          }
          /**
           * Selector写入内容过多问题 代码示例
           * */
          public static void writeServer1() throws IOException{
              ServerSocketChannel ssc = ServerSocketChannel.open();
              ssc.configureBlocking(false);
              Selector selector = Selector.open();
              ssc.register(selector, SelectionKey.OP_ACCEPT);
              ssc.bind(new InetSocketAddress(8080));
              while (true) {
                  selector.select();
                  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                  while (iter.hasNext()) {
                      SelectionKey key = iter.next();
                      iter.remove();
                      if (key.isAcceptable()) {
                          SocketChannel sc = ssc.accept();
                          sc.configureBlocking(false);
                          SelectionKey sckey = sc.register(selector, 0, null);
                          sckey.interestOps(SelectionKey.OP_READ);
                          // 1. 向客户端发送大量数据
                          StringBuilder sb = new StringBuilder();
                          for (int i = 0; i < 9000000; i++) {
                              sb.append("a");
                          }
                          ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                          if (buffer.hasRemaining()) {
                              // 2. 返回值代表实际写入的字节数
                              int write = sc.write(buffer);
                              System.out.println("实际写入的字节数===="+write);
                          }
                      }
                  }
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57

    2.2、客户端代码示例

    • 客户端代码示例

      package com.example.nettytest.nio.day3;
      
      import java.io.IOException;
      import java.net.InetSocketAddress;
      import java.nio.ByteBuffer;
      import java.nio.channels.SocketChannel;
      
      /**
       * @description:
       * @author: xz
       * @create: 2022-09-18
       */
      public class Test6Client {
          public static void main(String[] args) throws IOException {
              SocketChannel sc = SocketChannel.open();
              sc.connect(new InetSocketAddress("localhost", 8080));
      
              // 3. 接收数据
              int count = 0;
              while (true) {
                  ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                  count += sc.read(buffer);
                  System.out.println("实际读取到的字节数==="+count);
                  buffer.clear();
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

    2.3、启动服务端和客户端进行测试

    • 启动服务端,控制台输出如下:
      在这里插入图片描述

    • 启动客户端,控制台输出如下:
      在这里插入图片描述

    三、解决写入内容过多问题的代码示例

    3.1、修改服务端代码示例

    • 再处理write事件代码中,代码修如下:

       // 2. 返回值代表实际写入的字节数
      int write = sc.write(buffer);
      System.out.println("实际写入的字节数===="+write);
      // 3. 判断是否有剩余内容
      if (buffer.hasRemaining()) {
          /**
           * 4、关注事件
           *sckey.interestOps 表示关注原先注册的事件
           * SelectionKey.OP_WRITE 表示关注可写事件
           */
          sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
          // 5. 把未写完的数据挂到 sckey 上
          sckey.attach(buffer);
      }else if (key.isWritable()) {
          ByteBuffer buffer1 = (ByteBuffer) key.attachment();
          SocketChannel sc1 = (SocketChannel) key.channel();
          int write1 = sc1.write(buffer1);
          System.out.println("实际写入的字节数:"+write1);
          // 6. 清理操作
          if (!buffer.hasRemaining()) {
              // 需要清除buffer
              key.attach(null);
              //不需关注可写事件
              key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26

    3.2、服务端修改后的完整代码

    • 修改后的完整代码示例

      package com.example.nettytest.nio.day3;
      
      import java.io.IOException;
      import java.net.InetSocketAddress;
      import java.nio.ByteBuffer;
      import java.nio.channels.SelectionKey;
      import java.nio.channels.Selector;
      import java.nio.channels.ServerSocketChannel;
      import java.nio.channels.SocketChannel;
      import java.nio.charset.Charset;
      import java.util.Iterator;
      
      /**
       * @description: NIO(Selector处理write事件 写入内容过多问题)代码示例
       * @author: xz
       * @create: 2022-09-18
       */
      public class Test6Server {
          public static void main(String[] args) throws IOException {
              writeServer2();
          }
      
          /**
           * 解决 Selector写入内容过多问题的方式 代码示例
           * */
          public static void writeServer2() throws IOException{
              ServerSocketChannel ssc = ServerSocketChannel.open();
              ssc.configureBlocking(false);
              Selector selector = Selector.open();
              ssc.register(selector, SelectionKey.OP_ACCEPT);
              ssc.bind(new InetSocketAddress(8080));
              while (true) {
                  selector.select();
                  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                  while (iter.hasNext()) {
                      SelectionKey key = iter.next();
                      iter.remove();
                      if (key.isAcceptable()) {
                          SocketChannel sc = ssc.accept();
                          sc.configureBlocking(false);
                          SelectionKey sckey = sc.register(selector, 0, null);
                          sckey.interestOps(SelectionKey.OP_READ);
                          // 1. 向客户端发送大量数据
                          StringBuilder sb = new StringBuilder();
                          for (int i = 0; i < 9000000; i++) {
                              sb.append("a");
                          }
                          ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                          // 2. 返回值代表实际写入的字节数
                          int write = sc.write(buffer);
                          System.out.println("实际写入的字节数===="+write);
                          // 3. 判断是否有剩余内容
                          if (buffer.hasRemaining()) {
                              /**
                               * 4、关注事件
                               *sckey.interestOps 表示关注原先注册的事件
                               * SelectionKey.OP_WRITE 表示关注可写事件
                               */
                              sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                              // 5. 把未写完的数据挂到 sckey 上
                              sckey.attach(buffer);
                          }else if (key.isWritable()) {
                              ByteBuffer buffer1 = (ByteBuffer) key.attachment();
                              SocketChannel sc1 = (SocketChannel) key.channel();
                              int write1 = sc1.write(buffer1);
                              System.out.println("实际写入的字节数:"+write1);
                              // 6. 清理操作
                              if (!buffer.hasRemaining()) {
                                  // 需要清除buffer
                                  key.attach(null);
                                  //不需关注可写事件
                                  key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                              }
                          }
                      }
                  }
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79

    3.3、启动服务端和客户端进行测试

    • 启动服务端,控制台输出如下:
      在这里插入图片描述

    • 启动客户端,控制台输出如下:
      在这里插入图片描述

  • 相关阅读:
    【HTTP 常用的状态码及使用场景】
    数学建模学习(89):交叉熵优化算法(CEM)对多元函数寻优
    LeetCode 6190. 找到所有好下标
    C#-反射
    JDBC版本简介
    数据库迁移前后密码的读取方式不同导致识别到密码是错的
    Flink——实时计算引擎
    电子数据取证-流程与技术
    原码、反码、补码在汇编中的应用
    Spring探索——既生@Resource,何生@Autowired?
  • 原文地址:https://blog.csdn.net/li1325169021/article/details/126924222