• 【Netty】nio处理accept&read&write事件


           📝个人主页:五敷有你      

     🔥系列专栏:Netty

    ⛺️稳中求进,晒太阳

    1.处理accept

    1.1客户端代码

    1. public class Client {
    2. public static void main(String[] args) {
    3. try (Socket socket = new Socket("localhost", 8080)) {
    4. System.out.println(socket);
    5. socket.getOutputStream().write("world".getBytes());
    6. System.in.read();
    7. } catch (IOException e) {
    8. e.printStackTrace();
    9. }
    10. }
    11. }

    1.2 服务端代码

    1. @Slf4j
    2. public class ChannelDemo {
    3. public static void main(String[] args) {
    4. try (ServerSocketChannel channel = ServerSocketChannel.open()) {
    5. channel.bind(new InetSocketAddress(8080));
    6. System.out.println(channel);
    7. Selector selector = Selector.open();
    8. channel.configureBlocking(false);
    9. channel.register(selector, SelectionKey.OP_ACCEPT);
    10. while (true) {
    11. int count = selector.select();
    12. // int count = selector.selectNow();
    13. log.debug("select count: {}", count);
    14. // if(count <= 0) {
    15. // continue;
    16. // }
    17. // 获取所有事件
    18. Set keys = selector.selectedKeys();
    19. // 遍历所有事件,逐一处理
    20. Iterator iter = keys.iterator();
    21. while (iter.hasNext()) {
    22. SelectionKey key = iter.next();
    23. // 判断事件类型
    24. if (key.isAcceptable()) {
    25. ServerSocketChannel c = (ServerSocketChannel) key.channel();
    26. // 必须处理
    27. SocketChannel sc = c.accept();
    28. log.debug("{}", sc);
    29. }
    30. // 处理完毕,必须将事件移除
    31. iter.remove();
    32. }
    33. }
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. }
    37. }
    38. }

    💡 事件发生后能否不处理

    事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

    水平触发是什么

    参考这篇文章:细说八股 | NIO的水平触发和边缘触发到底有什么区别? - 掘金 (juejin.cn)

    2. 处理read事件

    2.1 服务端代码

    1. @Slf4j
    2. public class ChannelDemo {
    3. public static void main(String[] args) {
    4. try (ServerSocketChannel channel = ServerSocketChannel.open()) {
    5. channel.bind(new InetSocketAddress(8080));
    6. System.out.println(channel);
    7. Selector selector = Selector.open();
    8. channel.configureBlocking(false);
    9. channel.register(selector, SelectionKey.OP_ACCEPT);
    10. while (true) {
    11. int count = selector.select();
    12. // int count = selector.selectNow();
    13. log.debug("select count: {}", count);
    14. // if(count <= 0) {
    15. // continue;
    16. // }
    17. // 获取所有事件
    18. Set keys = selector.selectedKeys();
    19. // 遍历所有事件,逐一处理
    20. Iterator iter = keys.iterator();
    21. while (iter.hasNext()) {
    22. SelectionKey key = iter.next();
    23. // 判断事件类型
    24. if (key.isAcceptable()) {
    25. ServerSocketChannel c = (ServerSocketChannel) key.channel();
    26. // 必须处理
    27. SocketChannel sc = c.accept();
    28. sc.configureBlocking(false);
    29. sc.register(selector, SelectionKey.OP_READ);
    30. log.debug("连接已建立: {}", sc);
    31. } else if (key.isReadable()) {
    32. SocketChannel sc = (SocketChannel) key.channel();
    33. ByteBuffer buffer = ByteBuffer.allocate(128);
    34. int read = sc.read(buffer);
    35. if(read == -1) {
    36. key.cancel();
    37. sc.close();
    38. } else {
    39. buffer.flip();
    40. debug(buffer);
    41. }
    42. }
    43. // 处理完毕,必须将事件移除
    44. iter.remove();
    45. }
    46. }
    47. } catch (IOException e) {
    48. e.printStackTrace();
    49. }
    50. }
    51. }

    💡 为何要 iter.remove()

    因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

    • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey

    • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

    💡 cancel 的作用

    cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件。

    注意:

    关闭远程连接会发送一次可读事件,返回值为-1,需要cancel和close和remove

    ⚠️ 不处理边界的问题

    服务端

    1. public class Server {
    2. public static void main(String[] args) throws IOException {
    3. ServerSocket ss=new ServerSocket(9000);
    4. while (true) {
    5. Socket s = ss.accept();
    6. InputStream in = s.getInputStream();
    7. // 这里这么写,有没有问题
    8. byte[] arr = new byte[4];
    9. while(true) {
    10. int read = in.read(arr);
    11. // 这里这么写,有没有问题
    12. if(read == -1) {
    13. break;
    14. }
    15. System.out.println(new String(arr, 0, read));
    16. }
    17. }
    18. }
    19. }

    客户端

    1. public class Client {
    2. public static void main(String[] args) throws IOException {
    3. Socket max = new Socket("localhost", 9000);
    4. OutputStream out = max.getOutputStream();
    5. out.write("hello".getBytes());
    6. out.write("world".getBytes());
    7. out.write("你好".getBytes());
    8. max.close();
    9. }
    10. }

    输出

    为什么呢???????

    处理消息的边界(3种)

    1. 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

    2. 另一种思路是按分隔符拆分,缺点是效率低

    3. TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

    服务端

    下述代码:按分隔符拆分,然后动态扩容

    1. private static void split(ByteBuffer source) {
    2. source.flip();
    3. for (int i = 0; i < source.limit(); i++) {
    4. // 找到一条完整消息
    5. if (source.get(i) == '\n') {
    6. int length = i + 1 - source.position();
    7. // 把这条完整消息存入新的 ByteBuffer
    8. ByteBuffer target = ByteBuffer.allocate(length);
    9. // 从 source 读,向 target 写
    10. for (int j = 0; j < length; j++) {
    11. target.put(source.get());
    12. }
    13. debugAll(target);
    14. }
    15. }
    16. source.compact(); // 0123456789abcdef position 16 limit 16
    17. }
    18. public static void main(String[] args) throws IOException {
    19. // 1. 创建 selector, 管理多个 channel
    20. Selector selector = Selector.open();
    21. ServerSocketChannel ssc = ServerSocketChannel.open();
    22. ssc.configureBlocking(false);
    23. // 2. 建立 selector 和 channel 的联系(注册)
    24. // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    25. SelectionKey sscKey = ssc.register(selector, 0, null);
    26. // key 只关注 accept 事件
    27. sscKey.interestOps(SelectionKey.OP_ACCEPT);
    28. log.debug("sscKey:{}", sscKey);
    29. ssc.bind(new InetSocketAddress(8080));
    30. while (true) {
    31. // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
    32. // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
    33. selector.select();
    34. // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
    35. Iterator iter = selector.selectedKeys().iterator(); // accept, read
    36. while (iter.hasNext()) {
    37. SelectionKey key = iter.next();
    38. // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
    39. iter.remove();
    40. log.debug("key: {}", key);
    41. // 5. 区分事件类型
    42. if (key.isAcceptable()) { // 如果是 accept
    43. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    44. SocketChannel sc = channel.accept();
    45. sc.configureBlocking(false);
    46. ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
    47. // 将一个 byteBuffer 作为附件关联到 selectionKey 上
    48. SelectionKey scKey = sc.register(selector, 0, buffer);
    49. scKey.interestOps(SelectionKey.OP_READ);
    50. log.debug("{}", sc);
    51. log.debug("scKey:{}", scKey);
    52. } else if (key.isReadable()) { // 如果是 read
    53. try {
    54. SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
    55. // 获取 selectionKey 上关联的附件
    56. ByteBuffer buffer = (ByteBuffer) key.attachment();
    57. int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
    58. if(read == -1) {
    59. key.cancel();
    60. } else {
    61. split(buffer);
    62. // 需要扩容
    63. if (buffer.position() == buffer.limit()) {
    64. ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
    65. buffer.flip();
    66. newBuffer.put(buffer); // 0123456789abcdef3333\n
    67. key.attach(newBuffer);
    68. }
    69. }
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
    73. }
    74. }
    75. }
    76. }
    77. }

    客户端

    1. SocketChannel sc = SocketChannel.open();
    2. sc.connect(new InetSocketAddress("localhost", 8080));
    3. SocketAddress address = sc.getLocalAddress();
    4. // sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
    5. sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
    6. sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
    7. System.in.read();

    ByteBuffer 大小分配
    • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

    • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

      • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。

      • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

    3.处理Write事件

    write就是将buffer中的内容通过channel传输过去。

    • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

    • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

      • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上

      • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册

      • 如果不取消,会每次可写均会触发 write 事件

    1. public class WriteServer {
    2. public static void main(String[] args) throws IOException {
    3. ServerSocketChannel ssc = ServerSocketChannel.open();
    4. ssc.configureBlocking(false);
    5. ssc.bind(new InetSocketAddress(8080));
    6. Selector selector = Selector.open();
    7. ssc.register(selector, SelectionKey.OP_ACCEPT);
    8. while(true) {
    9. selector.select();
    10. Iterator iter = selector.selectedKeys().iterator();
    11. while (iter.hasNext()) {
    12. SelectionKey key = iter.next();
    13. iter.remove();
    14. if (key.isAcceptable()) {
    15. SocketChannel sc = ssc.accept();
    16. sc.configureBlocking(false);
    17. SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
    18. // 1. 向客户端发送内容
    19. StringBuilder sb = new StringBuilder();
    20. for (int i = 0; i < 3000000; i++) {
    21. sb.append("a");
    22. }
    23. ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    24. int write = sc.write(buffer);
    25. // 3. write 表示实际写了多少字节
    26. System.out.println("实际写入字节:" + write);
    27. // 4. 如果有剩余未读字节,才需要关注写事件
    28. if (buffer.hasRemaining()) {
    29. // read 1 write 4
    30. // 在原有关注事件的基础上,多关注 写事件
    31. sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
    32. // 把 buffer 作为附件加入 sckey
    33. sckey.attach(buffer);
    34. }
    35. } else if (key.isWritable()) {
    36. ByteBuffer buffer = (ByteBuffer) key.attachment();
    37. SocketChannel sc = (SocketChannel) key.channel();
    38. int write = sc.write(buffer);
    39. System.out.println("实际写入字节:" + write);
    40. if (!buffer.hasRemaining()) { // 写完了
    41. key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
    42. key.attach(null);
    43. }
    44. }
    45. }
    46. }
    47. }
    48. }

    客户端

    1. public class WriteClient {
    2. public static void main(String[] args) throws IOException {
    3. Selector selector = Selector.open();
    4. SocketChannel sc = SocketChannel.open();
    5. sc.configureBlocking(false);
    6. sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    7. sc.connect(new InetSocketAddress("localhost", 8080));
    8. int count = 0;
    9. while (true) {
    10. selector.select();
    11. Iterator iter = selector.selectedKeys().iterator();
    12. while (iter.hasNext()) {
    13. SelectionKey key = iter.next();
    14. iter.remove();
    15. if (key.isConnectable()) {
    16. System.out.println(sc.finishConnect());
    17. } else if (key.isReadable()) {
    18. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
    19. count += sc.read(buffer);
    20. buffer.clear();
    21. System.out.println(count);
    22. }
    23. }
    24. }
    25. }
    26. }
    💡 write 为何要取消

    只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注.

  • 相关阅读:
    dubbo是如何实现可扩展的?(二)
    如何实现办公终端安全
    EMO在哪体验?阿里对口型视频生成工具EMO下载地址?阿里巴巴新模型EMO的技术原理
    qian‘kun微服务配置vue3.2+ts+vite子应用教程
    NLP中基于Bert的数据预处理
    从简历被拒到收割8个大厂offer,我用了3个月成功破茧成蝶
    拧紧数据“安全阀”,筑牢个保“安全堤”
    网络技术八:Vlan和Trunk基础
    【软考 系统架构设计师】计算机组成与体系结构⑦ 校验码
    面对中小型机房动力环境该如何实现监控?
  • 原文地址:https://blog.csdn.net/m0_62645012/article/details/139749454