• 庖丁解牛:NIO核心概念与机制详解 06 _ 连网和异步 I/O


    在这里插入图片描述


    Pre

    庖丁解牛:NIO核心概念与机制详解 01

    庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

    庖丁解牛:NIO核心概念与机制详解 03 _ 缓冲区分配、包装和分片

    庖丁解牛:NIO核心概念与机制详解 04 _ 分散和聚集

    庖丁解牛:NIO核心概念与机制详解 05 _ 文件锁定


    概述

    在 Java NIO 中,连网操作与其他操作一样,依赖于通道(Channel)和缓冲区(Buffer)。通道是用于读取和写入数据的途径,而缓冲区则用于暂存数据。

    与传统的同步 I/O 不同,Java NIO 中的通道操作是非阻塞的,这意味着在发起 IO 请求后,进程可以继续执行其他任务,而不需要等待 IO 操作完成。当 IO 操作完成后,进程会收到通知,此时再进行相应的处理。


    异步 I/O

    异步 I/O 是一种 没有阻塞地 读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。

    另一方面,异步 I/O 调用不会阻塞。相反,你将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉你。

    异步 I/O 的一个优势在于,它允许你同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,你可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。

    来看个Demo

    这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。

    
    import java.io.*;
    import java.net.*;
    import java.nio.*;
    import java.nio.channels.*;
    import java.util.*;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class MultiPortEcho {
        private int ports[];
        private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
    
        public MultiPortEcho(int ports[]) throws IOException {
            this.ports = ports;
    
            go();
        }
    
        private void go() throws IOException {
            // Create a new selector
            Selector selector = Selector.open();
    
            // Open a listener on each port, and register each one
            // with the selector
            for (int i = 0; i < ports.length; ++i) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);
                ServerSocket ss = ssc.socket();
                InetSocketAddress address = new InetSocketAddress(ports[i]);
                ss.bind(address);
    
                SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
    
                System.out.println("Going to listen on " + ports[i]);
            }
    
            while (true) {
                int num = selector.select();
    
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
    
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next();
    
                    if ((key.readyOps() & SelectionKey.OP_ACCEPT)
                            == SelectionKey.OP_ACCEPT) {
                        // Accept the new connection
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
    
                        // Add the new connection to the selector
                        SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                        it.remove();
    
                        System.out.println("Got connection from " + sc);
                    } else if ((key.readyOps() & SelectionKey.OP_READ)
                            == SelectionKey.OP_READ) {
                        // Read the data
                        SocketChannel sc = (SocketChannel) key.channel();
    
                        // Echo data
                        int bytesEchoed = 0;
                        while (true) {
                            echoBuffer.clear();
    
                            int r = sc.read(echoBuffer);
    
                            if (r <= 0) {
                                break;
                            }
    
                            echoBuffer.flip();
    
                            sc.write(echoBuffer);
                            bytesEchoed += r;
                        }
    
                        System.out.println("Echoed " + bytesEchoed + " from " + sc);
    
                        it.remove();
                    }
    
                }
    
    //System.out.println( "going to clear" );
    //      selectedKeys.clear();
    //System.out.println( "cleared" );
            }
        }
    
        public static void main(String args[]) throws Exception {
            if (args.length <= 0) {
                System.err.println("Usage: java MultiPortEcho port [port port ...]");
                System.exit(1);
            }
    
            int ports[] = new int[args.length];
    
            for (int i = 0; i < args.length; ++i) {
                ports[i] = Integer.parseInt(args[i]);
            }
    
            new MultiPortEcho(ports);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    Selectors

    我们来基于 MultiPortEcho 的源代码中的 go() 方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。

    异步 I/O 中的核心对象名为 SelectorSelector 就是你注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉你所发生的事件。

    所以,我们需要做的第一件事就是创建一个 Selector

    // Create a new selector
    Selector selector = Selector.open();
    
    • 1
    • 2

    然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个参数总是这个 Selector。


    打开一个 ServerSocketChannel

    为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。

    对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking( false );
      
    ServerSocket ss = ssc.socket();
    InetSocketAddress address = new InetSocketAddress( ports[ii] );
    ss.bind( address );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。
    第二行将 ServerSocketChannel 设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。


    选择键

    下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

    SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
    
    • 1

    register() 的第一个参数总是这个 Selector
    第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。

    请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知你某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。


    内部循环

    现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

    int num = selector.select();
      
    Set selectedKeys = selector.selectedKeys();
    Iterator it = selectedKeys.iterator();
      
    while (it.hasNext()) {
         SelectionKey key = (SelectionKey)it.next();
         // ... deal with I/O event ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    首先,我们调用 Selectorselect() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。

    接下来,我们调用 SelectorselectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。

    我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,你必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。


    监听新连接

    程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

    if ((key.readyOps() & SelectionKey.OP_ACCEPT)
         == SelectionKey.OP_ACCEPT) {
      
         // Accept the new connection
         // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以肯定地说, readOps() 方法告诉我们该事件是新的连接。


    接受新的连接

    因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

    ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
    SocketChannel sc = ssc.accept();
    
    • 1
    • 2

    下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

    sc.configureBlocking( false );
    SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
    
    • 1
    • 2

    注意我们使用 register()OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。


    删除处理过的 SelectionKey

    在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey

    it.remove();
    
    • 1

    现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。


    传入的 I/O

    当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

    } else if ((key.readyOps() & SelectionKey.OP_READ)
         == SelectionKey.OP_READ) {
         // Read the data
         SocketChannel sc = (SocketChannel)key.channel();
         // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    与以前一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。


    回到主循环

    每次返回主循环,我们都要调用 selectSelector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

    这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,我们需要通过将通道从 Selector 中删除来处理关闭的通道。而且我们可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。

    在这里插入图片描述

  • 相关阅读:
    Excel 数据透视表教程大全之 05 数据透视表绘制各种二维排列的数据,实现双向枢轴(教程含数据)
    GitHub 标星 120K 的 Java 面试知识点总结,真就物超所值了
    分布式协议与算法——Raft算法
    网安小贴士(4)哈希函数
    实践笔记-docker安装及配置镜像源
    CVPR2022 | 可精简域适应
    CANoe新建XML自动化Test Modules
    全志XR806基于http的无线ota功能实验
    (三)softmax分类--九五小庞
    【Python】创建和解包归档文件
  • 原文地址:https://blog.csdn.net/yangshangwei/article/details/134512955