• 3、Nio源码


    一、Nio客户端实现

    1. package com.zoo.nio;
    2. import java.io.IOException;
    3. import java.net.InetSocketAddress;
    4. import java.nio.ByteBuffer;
    5. import java.nio.channels.SelectionKey;
    6. import java.nio.channels.Selector;
    7. import java.nio.channels.ServerSocketChannel;
    8. import java.nio.channels.SocketChannel;
    9. import java.util.Date;
    10. import java.util.Iterator;
    11. public class ClientTest {
    12. public static void main(String[] args) throws Exception{
    13. //1.获取通道
    14. SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",4201));
    15. //2.切换非阻塞模式
    16. sChannel.configureBlocking(false);
    17. //3.分配指定大小缓冲区
    18. ByteBuffer buf = ByteBuffer.allocate(1024);
    19. //4.发送数据给服务端
    20. buf.put(new Date().toString().getBytes());
    21. buf.flip();
    22. sChannel.write(buf);
    23. buf.clear();
    24. //5.关闭通道
    25. sChannel.close();
    26. }
    27. }

    二、nio服务端实现

    1. package com.zoo.nio;
    2. import java.io.IOException;
    3. import java.net.InetSocketAddress;
    4. import java.nio.ByteBuffer;
    5. import java.nio.channels.SelectionKey;
    6. import java.nio.channels.Selector;
    7. import java.nio.channels.ServerSocketChannel;
    8. import java.nio.channels.SocketChannel;
    9. import java.util.Date;
    10. import java.util.Iterator;
    11. public class ServiceTest {
    12. public static void main(String[] args) throws Exception{
    13. //1.获取通道
    14. ServerSocketChannel ssChannel = ServerSocketChannel.open();
    15. //2.切换非阻塞模式
    16. ssChannel.configureBlocking(false);
    17. //3.绑定连接
    18. ssChannel.bind(new InetSocketAddress(4201));
    19. //4.获取选择器
    20. Selector selector = Selector.open();
    21. //5.将通道注册到选择器上,并指定监听事件
    22. ssChannel.register(selector, SelectionKey.OP_ACCEPT);
    23. //6.轮询式获取选择器的准备就绪事件
    24. while(selector.select() > 0){
    25. //7.获取当前选择器中所有注册的选择键(已就绪的监听事件)
    26. Iterator iterator = selector.selectedKeys().iterator();
    27. //8.获取准备就绪的事件
    28. while(iterator.hasNext()){
    29. SelectionKey sk = iterator.next();
    30. //9.判断什么事件准备就绪
    31. if (sk.isAcceptable()){
    32. //10.获取客户端连接
    33. SocketChannel sChannel = ssChannel.accept();
    34. //11.切换非阻塞模式
    35. sChannel.configureBlocking(false);
    36. //12.将该通道注册到选择器上
    37. sChannel.register(selector,SelectionKey.OP_READ);
    38. }else if(sk.isReadable()){//读就绪
    39. //13.获取当前选择器上读就绪状态的通道
    40. SocketChannel sChannel = (SocketChannel) sk.channel();
    41. //14.读取数据
    42. ByteBuffer buf = ByteBuffer.allocate(1024);
    43. int len = 0;
    44. while ((len=sChannel.read(buf))>0){
    45. buf.flip();
    46. System.out.println(new String(buf.array(),0,len));
    47. buf.clear();
    48. }
    49. }
    50. //15.取消选择键
    51. iterator.remove();
    52. }
    53. }
    54. }
    55. }

    三、selector.select() 调用链分析

    3.1、jdk纬度分析

    3.1.1、nio核心代码

    1. //KQueueSelectorImpl.java
    2. protected int doSelect(long var1) throws IOException {
    3. boolean var3 = false;
    4. if (this.closed) {
    5. throw new ClosedSelectorException();
    6. } else {
    7. //处理注销的selectionKey队列
    8. this.processDeregisterQueue();
    9. int var7;
    10. try {
    11. this.begin();
    12. // 调用了poll方法,底层调用了native的epollCtl和epollWait方法
    13. var7 = this.kqueueWrapper.poll(var1);
    14. } finally {
    15. this.end();
    16. }
    17. this.processDeregisterQueue();
    18. return this.updateSelectedKeys(var7);
    19. }
    20. }
    21. //KQueueArrayWrapper.java
    22. // 这里抛出个问题,var1=timeout=-1 为什么超时是-1?
    23. int poll(long var1) {
    24. this.updateRegistrations();
    25. //Mac系统基于UNIX,window可能不一样,阻塞调用等待事件返回结果
    26. int var3 = this.kevent0(this.kq, this.keventArrayAddress, 128, var1);
    27. return var3;
    28. }
    29. //kqueue是在UNIX上比较高效IO复用技术。
    30. //所谓的IO复用,就是同时等待多个文件描述符就绪,以系统调用的形式提供。如果所有文件描述符都没有就绪的话,该系统调用阻塞,否则调用返回,允许用户进行后续的操作。
    31. //常见的IO复用技术有select, poll, epoll以及kqueue等等。其中epoll为Linux独占,而kqueue则在许多UNIX系统上存在
    32. //kevent() 是阻塞调用,等到有事件才返回。阻塞时线程处于sleep状态,有事件时系统激活kqueue,kevent()返回
    33. private native int kevent0(int var1, long var2, int var4, long var5);
    34. //从cancellledKeys集合中取出注销的SelectionKey,执行注销操作。
    35. //将处理后的SelectionKey从cancelledKeys集合中移除。执行processDeregisterQueue()
    36. //后cancelledKeys集合会为空
    37. void processDeregisterQueue() throws IOException {
    38. // Precondition: Synchronized on this, keys, and selectedKeys
    39. Set cks = cancelledKeys();
    40. synchronized (cks) {
    41. if (!cks.isEmpty()) {
    42. Iterator i = cks.iterator();
    43. while (i.hasNext()) {
    44. SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
    45. try {
    46. implDereg(ski);
    47. } catch (SocketException se) {
    48. throw new IOException("Error deregistering key", se);
    49. } finally {
    50. i.remove();
    51. }
    52. }
    53. }
    54. }
    55. }
    56. //执行完该方法后,注销的SelectionKey就不会出现在keys、selectedKeys以及cancelledKeys这三个集合中
    57. protected void implDereg(SelectionKeyImpl ski) throws IOException {
    58. assert (ski.getIndex() >= 0);
    59. SelChImpl ch = ski.channel;
    60. int fd = ch.getFDVal();
    61. // 将已经注销的selectionKey从fdToKey(文件描述符与SelectionKeyImpl的映射表)中移除
    62. fdToKey.remove(Integer.valueOf(fd));
    63. // 将selectionKey所代表的channel的文件描述符从EPollArrayWrapper中移除
    64. pollWrapper.remove(fd);
    65. ski.setIndex(-1);
    66. //将selectionKey从keys集合中移除
    67. keys.remove(ski);
    68. selectedKeys.remove(ski);
    69. deregister((AbstractSelectionKey)ski);
    70. SelectableChannel selch = ski.channel();
    71. // 如果对应的频道已经关闭并且没有注册其他的选择了,则将该信道关闭
    72. if (!selch.isOpen() && !selch.isRegistered())
    73. ((SelChImpl)selch).kill();
    74. }
    75. private int updateSelectedKeys() {
    76. //更新了的keys的个数,或在说是产生的事件的个数
    77. int entries = pollWrapper.updated;
    78. int numKeysUpdated = 0;
    79. for (int i=0; i
    80. //对应的channel的fd
    81. int nextFD = pollWrapper.getDescriptor(i);
    82. //通过fd找到对应的SelectionKey
    83. SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
    84. if (ski != null) {
    85. int rOps = pollWrapper.getEventOps(i);
    86. //更新selectedKey变量,并通知响应的channel来做响应的处理
    87. if (selectedKeys.contains(ski)) {
    88. if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
    89. numKeysUpdated++;
    90. }
    91. } else {
    92. ski.channel.translateAndSetReadyOps(rOps, ski);
    93. if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
    94. selectedKeys.add(ski);
    95. numKeysUpdated++;
    96. }
    97. }
    98. }
    99. }
    100. return numKeysUpdated;
    101. }

    四、通道Channel

    Channel是一个通道,它就像自来水管一样,网络数据通过Channel读取和写入。通 道 与 流 的 不 同 之 处 在 于 通 道 是 双 向 的,流只是在一个方向上移 动( 一 个 流 必 须 是 Inputstream或者OutpulStream的子类),而通道可以用于读、写或者一者同时进行。因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。


    五、多路复用器Selector

    我们将探索多路夏用器Selector,它是Java NIO编程的基础,熟练地掌握 Selector对于NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单来讲, Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件, 这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的IO操作。 一个多路复用器Selector可以同时轮询多个Channel,由JDK使用了epoll。代替传 统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一 个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。

    六、nio读写流程

    1、nio读写流程

    1、selector.select()循环获取已经可读的SelectionKey
    2、迭代SelectionKey获取SocketChannel通道
    3、判断通道是否准备就绪
    4、开始读写
    5、抛出个问题,SelectionKey list如何和内核映射?我调试没有调试出结果

    2、获取linux内核select状态

    1、这篇文章有介绍kevent函数使用,其实就是linux的select模型
    2、什么是linux文件描述符
    3、kevent依赖文件描述符的就绪状态,这篇文章介绍文件描述符的就绪条件

  • 相关阅读:
    【HBZ分享】云数据库如何定位慢查询
    Python核心数据类型
    LC滤波器设计学习笔记(一)滤波电路入门
    C++核心编程--对象篇
    Unity使用新输入系统InputSystem制作飞机大战Demo(对象池设计模式及应用)
    fastDFS安装笔记
    假ArrayList导致的线上事故......
    聚观早报 | iPhone14或于9月23日上市;腾讯发布Max 二代机器人
    【网络协议】聊聊DHCP和PXE 工作原理
    JS深入学习笔记 - 第三章.变量作用域与内存
  • 原文地址:https://blog.csdn.net/god8816/article/details/127773666