• Java NIO模型(提供代码示例)


    一、NIO特点介绍

    • NIO全称 java non-blocking IO。从JDK 1.4开始,java提供了一些列改进的输入/输出(I/O)的新特性,被称为NIO,是同步非阻塞的,NIO相关类都被放在java.nio包及其子包下。
    • NIO是面向缓冲区的,或者面向块编程的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区内前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞的高伸缩性网络
    • Java NIO的非阻塞模式,使一个线程从某通道发送或者读取数据,但是它仅能得到目前可用的数据,如果目前没有可用的数据时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可读取之前,该线程可以继续做其他事情。非阻塞就是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
    • NIO三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)
      • 在线程处理过程中,如果涉及到IO操作,那么当前的线程不会被阻塞,而是会去处理其他业务代码,然后等过段时间再来查询 IO 交互是否完成。如下图:Buffer 是一个缓冲区,用来缓存读取和写入的数据;Channel 是一个通道,负责后台对接 IO 数据;而 Selector 实现的主要功能,是主动查询哪些通道是处于就绪状态。Selector复用一个线程,来查询已就绪的通道,这样大大减少 IO 交互引起的频繁切换线程的开销.

    在这里插入图片描述

    二、NIO代码实现

    2.1、客户端代码

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    public class NioClient implements Runnable {
        private String host;
        private int port;
        private volatile boolean started;
        private Selector selector;
        private SocketChannel socketChannel;
    
        public NioClient(String ip, int port) {
            this.host = ip;
            this.port = port;
    
            try {
                /*创建选择器的实例*/
                selector = Selector.open();
                /*创建ServerSocketChannel的实例*/
                socketChannel = SocketChannel.open();
                /*设置通道为非阻塞模式*/
                socketChannel.configureBlocking(false);
    
                started = true;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
    
            //循环遍历selector
            while (started) {
                try {
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //获取当前有哪些事件可以使用
                    Set<SelectionKey> keys = selector.selectedKeys();
                    //转换为迭代器
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
                        如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
                        的键出现,这会导致我们尝试再次处理它。*/
                        it.remove();
                        try {
                            if (key.isValid()) {
                                //连接事件
                                if (key.isConnectable()) {
                                    connectHandler(key);
                                }
                                //读事件
                                if (key.isReadable()) {
                                    readHandler(key);
                                }
                            }
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if (selector != null)
                try {
                    selector.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
        }
    
        private void connectHandler(SelectionKey key) throws IOException {
            //获得关心当前事件的channel
            SocketChannel sc = (SocketChannel) key.channel();
            if (sc.finishConnect()) {
                socketChannel.register(selector,
                        SelectionKey.OP_READ);
            } else System.exit(1);
        }
    
        private void readHandler(SelectionKey key) throws IOException {
            //获得关心当前事件的channel
            SocketChannel sc = (SocketChannel) key.channel();
            //创建ByteBuffer,并开辟一个1M的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //读取请求码流,返回读取到的字节数
            int readBytes = sc.read(buffer);
            //读取到字节,对字节进行编解码
            if (readBytes > 0) {
                //将缓冲区当前的limit设置为position,position=0,
                // 用于后续对缓冲区的读取操作
                buffer.flip();
                //根据缓冲区可读字节数创建字节数组
                byte[] bytes = new byte[buffer.remaining()];
                //将缓冲区可读字节数组复制到新建的数组中
                buffer.get(bytes);
                String result = new String(bytes, "UTF-8");
                System.out.println("客户端收到消息:" + result);
            }
            //链路已经关闭,释放资源
            else if (readBytes < 0) {
                key.cancel();
                sc.close();
            }
        }
    
        private void doWrite(SocketChannel channel, String request)
                throws IOException {
            //将消息编码为字节数组
            byte[] bytes = request.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            /*关心事件和读写网络并不冲突*/
            channel.write(writeBuffer);
        }
    
        private void doConnect() throws IOException {
            /*非阻塞的连接*/
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            }
        }
    
        //写数据对外暴露的API
        public void sendMsg(String msg) throws Exception {
            doWrite(socketChannel, msg);
        }
    
        public void stop() {
            started = false;
        }
    
    
        public static void main(String[] args) throws Exception {
    
            NioClient nioClient = new NioClient("127.0.0.1", 9998);
            new Thread(nioClient).start();
            System.out.println("请输入消息:");
            Scanner scanner = new Scanner(System.in);
            while (true) {
                nioClient.sendMsg(scanner.next());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174

    2.2、服务端代码

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NioServer {
        private Selector selector;
        private ServerSocketChannel serverChannel;
        private volatile boolean started;
    
        /**
         * 构造方法
         *
         * @param port 指定要监听的端口号
         */
        public NioServer(int port) {
            try {
                //创建选择器
                selector = Selector.open();
                //打开监听通道
                serverChannel = ServerSocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;
                // 如果为 false,则此通道将被置于非阻塞模式
                serverChannel.configureBlocking(false);//开启非阻塞模式
                //绑定端口 backlog设为1024
                serverChannel.socket()
                        .bind(new InetSocketAddress(port), 1024);
                //监听客户端连接请求
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                //标记服务器已开启
                started = true;
                System.out.println("服务器已启动,端口号:" + port);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        public void run() {
            //循环遍历selector
            while (started) {
                try {
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        System.out.println("当前通道的事件:" + key.interestOps());
                        it.remove();
                        try {
                            if (key.isValid()) {
                                //处理新接入的请求消息
                                if (key.isAcceptable()) {
                                    acceptHandle(key);
                                }
                                //读消息
                                if (key.isReadable()) {
                                    readHandler(key);
                                }
                                //写消息
                                if (key.isWritable()) {
                                    writHandler(key);
                                }
                            }
    
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if (selector != null)
                try {
                    selector.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
        }
    
        private void acceptHandle(SelectionKey key) throws IOException {
            //获得关心当前事件的channel
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //通过ServerSocketChannel的accept创建SocketChannel实例
            //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
            SocketChannel sc = ssc.accept();
            System.out.println("======socket channel 建立连接=======");
            //设置为非阻塞的
            sc.configureBlocking(false);
            //连接已经完成了,可以开始关心读事件了
            sc.register(selector, SelectionKey.OP_READ);
        }
    
        private void readHandler(SelectionKey key) throws IOException {
            System.out.println("======socket channel 数据准备完成," +
                    "可以去读==读取=======");
            SocketChannel sc = (SocketChannel) key.channel();
            //创建ByteBuffer,并开辟一个1M的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(2);
            //读取请求码流,返回读取到的字节数
            int readBytes = sc.read(buffer);
            //读取到字节,对字节进行编解码
            if (readBytes > 0) {
                //将缓冲区当前的limit设置为position,position=0,
                // 用于后续对缓冲区的读取操作
                buffer.flip();
                //根据缓冲区可读字节数创建字节数组
                byte[] bytes = new byte[buffer.remaining()];
                //将缓冲区可读字节数组复制到新建的数组中
                buffer.get(bytes);
                String message = new String(bytes, "UTF-8");
                System.out.println("服务器收到消息:" + message);
                //处理数据 构建应答消息(这里只简单响应,有需要自行额外处理)
                String result = "服务器已经收到 message = " + message;
                //发送应答消息
                doWrite(sc, result);
            }
            //链路已经关闭,释放资源
            else if (readBytes < 0) {
                key.cancel();
                sc.close();
            }
        }
    
        private void writHandler(SelectionKey key) throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            if (buffer.hasRemaining()) {
                int count = sc.write(buffer);
                System.out.println("write :" + count
                        + "byte, remaining:" + buffer.hasRemaining());
            } else {
                /*取消对写的注册*/
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    
    
        //发送应答消息
        private void doWrite(SocketChannel channel, String response)
                throws IOException {
            //将消息编码为字节数组
            byte[] bytes = response.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ,
                    writeBuffer);
        }
    
        public void stop() {
            started = false;
        }
    
    
        public static void main(String[] args) {
            NioServer nioServer = new NioServer(9998);
            nioServer.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
  • 相关阅读:
    ES6 set 数据结构及拓展运算符及map集合
    flink技术总结待续
    算法记录 & 牛客网 & leetcode刷题记录
    django基于python的疫情数据可视化分析系统--python-计算机毕业设计
    FFmpeg内存管理
    html如何导出为pdf格式?如何在一个js文件中,将一个函数注册到vue.prototype上?
    nodejs---fastify-路由
    数据分析思维(一)|信度与效度思维
    人工智能在日常生活中的应用
    项目管理:如何建立一个具有执行力的团队?
  • 原文地址:https://blog.csdn.net/weixin_44606481/article/details/133837946