• IO多路复用Selector


    多路复用

    单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用。

    • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用

    • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证

      • 有可连接事件时才去连接

      • 有可读事件才去读取

      • 有可写事件才去写入

        • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

    Selector 

    创建

    Selector selector = Selector.open();

    绑定 Channel 事件

    也称之为注册事件,绑定的事件 selector 才会关心

    1. channel.configureBlocking(false);
    2. SelectionKey key = channel.register(selector, 绑定事件);
    • channel 必须工作在非阻塞模式

    • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

    • 绑定的事件类型可以有

      • connect - 客户端连接成功时触发

      • accept - 服务器端成功接受连接时触发

      • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况

      • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

    监听 Channel 事件

    1. //方法1,阻塞直到绑定事件发生
    2. int count = selector.select();
    3. //方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
    4. int count = selector.select(long timeout);
    5. //方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
    6. int count = selector.selectNow();

     select 何时不阻塞

    • 事件发生时

      • 客户端发起连接请求,会触发 accept 事件

      • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件

      • channel 可写,会触发 write 事件

      • 在 linux 下 nio bug 发生时

    • 调用 selector.wakeup()

    • 调用 selector.close()

    • selector 所在线程 interrupt

    处理 accept 事件

    服务器端代码

    1. public class Server {
    2. public static void main(String[] args) throws IOException {
    3. Selector selector = Selector.open();
    4. ServerSocketChannel ssc = ServerSocketChannel.open();
    5. ssc.bind(new InetSocketAddress(8080));
    6. //非阻塞(accept方法非阻塞)
    7. ssc.configureBlocking(false);
    8. //将 channel 注册到 selector 里
    9. SelectionKey sscKey = ssc.register(selector, 0, null);
    10. System.out.println("ssc register key:"+sscKey);
    11. //设置为accept事件
    12. sscKey.interestOps(SelectionKey.OP_ACCEPT);
    13. while (true){
    14. //select方法无事件阻塞,有事件不阻塞
    15. //事件要么处理,要么取消,不能置之不理,否则select不会阻塞
    16. selector.select();
    17. //处理事件,selectedKeys 里包含所有发生的事件
    18. Iterator iterator = selector.selectedKeys().iterator();
    19. if (iterator.hasNext()) {
    20. SelectionKey key = iterator.next();
    21. //处理key时,要从selectedKeys中删除,否则下次处理会出问题
    22. iterator.remove();
    23. //处理accept事件
    24. if (key.isAcceptable()) {
    25. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    26. SocketChannel sc = channel.accept();
    27. System.out.println("sc register key:"+scKey);
    28. }
    29. }
    30. }
    31. }
    32. }

    事件发生后能否不处理

    事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

    ByteBuffer 大小分配

    • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

    • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

      • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。

      • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

    处理 read 事件

    1. public class Server {
    2. public static void main(String[] args) throws IOException {
    3. Selector selector = Selector.open();
    4. ServerSocketChannel ssc = ServerSocketChannel.open();
    5. ssc.bind(new InetSocketAddress(8080));
    6. //非阻塞(accept方法非阻塞)
    7. ssc.configureBlocking(false);
    8. //将 channel 注册到 selector 里
    9. SelectionKey sscKey = ssc.register(selector, 0, null);
    10. System.out.println("ssc register key:"+sscKey);
    11. //设置为accept事件
    12. sscKey.interestOps(SelectionKey.OP_ACCEPT);
    13. while (true){
    14. //select方法无事件阻塞,有事件不阻塞
    15. //事件要么处理,要么取消,不能置之不理,否则select不会阻塞
    16. selector.select();
    17. //处理事件,selectedKeys 里包含所有发生的事件
    18. Iterator iterator = selector.selectedKeys().iterator();
    19. if (iterator.hasNext()) {
    20. SelectionKey key = iterator.next();
    21. //处理key时,要从selectedKeys中删除,否则下次处理会出问题
    22. iterator.remove();
    23. //处理accept事件
    24. if (key.isAcceptable()) {
    25. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    26. SocketChannel sc = channel.accept();
    27. //非阻塞(read方法非阻塞)
    28. sc.configureBlocking(false);
    29. ByteBuffer buffer = ByteBuffer.allocate(16);
    30. //将 channel 注册到 selector 里,添加附件buffer
    31. SelectionKey scKey = sc.register(selector, 0, buffer);
    32. System.out.println("sc register key:"+scKey);
    33. //设置为read事件
    34. scKey.interestOps(SelectionKey.OP_READ);
    35. //处理read事件
    36. }else if (key.isReadable()){
    37. try {
    38. SocketChannel channel = (SocketChannel) key.channel();
    39. ByteBuffer buffer = (ByteBuffer) key.attachment();
    40. int read = channel.read(buffer);
    41. if (read == -1) {
    42. //客户端关闭会发送read事件,处理客户端正常关闭,取消该事件
    43. key.cancel();
    44. } else {
    45. split(buffer);
    46. //容量到达上限
    47. if (buffer.position() == buffer.limit()){
    48. //拷贝扩容
    49. ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
    50. buffer.flip();
    51. newBuffer.put(buffer);
    52. //为key添加新的附件newBuffer
    53. key.attach(newBuffer);
    54. }
    55. System.out.println(Charset.defaultCharset().decode(buffer));
    56. }
    57. }catch (Exception e){
    58. e.printStackTrace();
    59. //客户端关闭会发送read事件,处理客户端异常关闭,取消该事件
    60. key.cancel();
    61. }
    62. }
    63. }
    64. }
    65. }
    66. public static void split(ByteBuffer source){
    67. source.flip();
    68. for (int i = 0; i < source.limit(); i++) {
    69. //以\n为每个词的结束符
    70. if (source.get(i) == '\n'){
    71. int length = i + 1 - source.position();
    72. for (int j = 0; j < length; j++) {
    73. source.get();
    74. }
    75. }
    76. }
    77. source.compact();
    78. }
    79. }

    为何要  iterator.remove()

    因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

    • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey

    • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

    cancel 的作用

    cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

    处理 write 事件

    • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

    • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

      • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上

      • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册

      • 如果不取消,会每次可写均会触发 write 事件

    1. public class WriteServer {
    2. public static void main(String[] args) throws IOException {
    3. ServerSocketChannel ssc = ServerSocketChannel.open();
    4. ssc.configureBlocking(false);
    5. ssc.bind(new InetSocketAddress(8080));
    6. Selector selector = Selector.open();
    7. ssc.register(selector, SelectionKey.OP_ACCEPT);
    8. while (true){
    9. selector.select();
    10. Iterator iterator = selector.selectedKeys().iterator();
    11. if (iterator.hasNext()) {
    12. SelectionKey key = iterator.next();
    13. iterator.remove();
    14. if (key.isAcceptable()) {
    15. SocketChannel sc = ssc.accept();
    16. sc.configureBlocking(false);
    17. SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
    18. StringBuilder sb = new StringBuilder();
    19. for (int i = 0; i < 5000000; i++) {
    20. sb.append("a");
    21. }
    22. ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    23. //写一次
    24. int write = sc.write(buffer);
    25. System.out.println(write);
    26. //如果没写完,则设置write事件,等待缓冲区空闲再写
    27. if (buffer.hasRemaining()) {
    28. scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
    29. //添加附件buffer
    30. scKey.attach(buffer);
    31. }
    32. }else if (key.isWritable()){
    33. ByteBuffer buffer = (ByteBuffer) key.attachment();
    34. SocketChannel channel = (SocketChannel) key.channel();
    35. int write = channel.write(buffer);
    36. System.out.println(write);
    37. //清理操作
    38. if (!buffer.hasRemaining()) {
    39. //清除附件
    40. key.attach(null);
    41. //取消write事件
    42. key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
    43. }
    44. }
    45. }
    46. }
    47. }
    48. }

    write 为何要取消

    只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

  • 相关阅读:
    【0到1学习Unity脚本编程】第一人称视角的角色控制器
    软件设计师 下午题第四题
    关于:Redis 基础知识,集群原理和面试资料【篇】(专题汇总)
    打开算法之门,算法学习瓶颈、学习方法
    含文档+PPT+源码等]精品基于PHP实现的实验室安全系统设计与实现[包运行成功]计算机PHP毕业设计项目源码
    如何去推动自己团队所提出的需求
    学会这 29 个 函数,你就是 Pandas 专家
    java计算机毕业设计论文管理系统源程序+mysql+系统+lw文档+远程调试
    前端面试HTML和CSS总结,这一篇就够了!
    彻底解决Win11锁屏界面黑屏或者图片不变化
  • 原文地址:https://blog.csdn.net/weixin_42318705/article/details/126591151