• Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)


    在前面NIO的例子中,在服务端,有对连接事件(SelectionKey.OP_ACCEPT)的处理,也有对读事件(SelectionKey.OP_READ)的处理,但是没有对写事件(SelectionKey.OP_WRITE)进行处理,原因就是写事件有些特殊,在这里单独记录一下。

    网上有一些例子都是在服务端读完数据后直接给客户端SocketChannel对应的SelectionKey注册上写事件(SelectionKey.OP_WRITE),写完数据后也不注销写事件:

    @Slf4j
    public class NioServer {
    
        public static void main(String[] args) throws Exception {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8080));
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int select = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel serverSocket = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = serverSocket.accept();
                        log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        socketChannel.read(byteBuffer);
                        String message = new String(byteBuffer.array()).trim();
                        byteBuffer.clear();
                        log.info("receive message from client. client:{} message length:{}", socketChannel.getRemoteAddress(),message.length());
                        socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE);
                        selectionKey.attach(message);
                    } else if (selectionKey.isWritable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        String response = (String) selectionKey.attachment();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(response.length());
                        byteBuffer.put(response.getBytes());
                        byteBuffer.flip();
                        socketChannel.write(byteBuffer);
                        log.info("send message to client. client:{} message length:{}", socketChannel.getRemoteAddress(), response.length());
                    }
                    iterator.remove();
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    但实际上根本不能这么用,即使在服务端向客户端写完数据后,也会不断触发写事件(selector.select()返回, selectionKey.isWritable()返回true )。



    什么时候会触发写事件呢?

    在前面的文章 《Java网络编程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》 中简单提到SelectionKey.OP_WRITE事件表示已经可以向通道写数据了(通道目前可以用于写操作),那什么时候才算“可以向通道写数据”呢?

    如果有channel在Selector上注册了SelectionKey.OP_WRITE事件,在调用selector.select();时,系统会检查内核写缓冲区是否可写(当写缓冲区已满、channel调用了shutdownOutPut等情况,内核缓冲区不可写),如果可写,selector.select();会立即返回写事件。

    把服务端处理读事件的代码优化如下:

    @Slf4j
    public class NIOSelectorNonblockingWriteServer {
    
        private final static int MESSAGE_LENGTH = 1024 * 1024 * 100;
    
        public static void main(String[] args) throws Exception {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080), 50);
            Selector selector = Selector.open();
            SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int count = selector.select();
                log.info("select event count:" + count);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    // 有客户端请求建立连接
                    if (selectionKey.isAcceptable()) {
                        handleAccept(selectionKey);
                    }
                    // 有客户端发送数据
                    else if (selectionKey.isReadable()) {
                        handleRead(selectionKey);
                    }
                    // 可以向客户端发送数据
                    else if (selectionKey.isWritable()) {
                        handleWrite(selectionKey);
                    }
                    iterator.remove();
                }
            }
        }
    
        private static void handleAccept(SelectionKey selectionKey) throws IOException {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (Objects.nonNull(socketChannel)) {
                log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
                // 设置客户端Channel为非阻塞模式,否则在执行socketChannel.read()时会阻塞
                socketChannel.configureBlocking(false);
                Selector selector = selectionKey.selector();
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
        }
    
        private static void handleRead(SelectionKey selectionKey) throws IOException {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(MESSAGE_LENGTH);
            int length = 0;
            while (length < MESSAGE_LENGTH) {
                length += socketChannel.read(readBuffer);
            }
            log.info("receive message from client. client:{} message length:{}", socketChannel.getRemoteAddress(), readBuffer.position());
    
            ByteBuffer writeBuffer = ByteBuffer.allocate(readBuffer.position());
            readBuffer.flip();
            writeBuffer.put(readBuffer);
            // 读完数据后,为 SelectionKey 注册可写事件
            if (!isInterest(selectionKey, SelectionKey.OP_WRITE)) {
                selectionKey.interestOps(selectionKey.interestOps() + SelectionKey.OP_WRITE);
            }
            writeBuffer.flip();
            selectionKey.attach(writeBuffer);
        }
    
        // 服务端可能是为每个Channel维护一块缓冲区,当向某个Channel写数据时缓冲区满了,还可以向其他Channel写数据
        private static void handleWrite(SelectionKey selectionKey) throws IOException {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer writeBuffer = (ByteBuffer) selectionKey.attachment();
            int writeLength = socketChannel.write(writeBuffer);
            log.info("send message to client. client:{} message length:{}", socketChannel.getRemoteAddress(), writeLength);
            if (!writeBuffer.hasRemaining()) {
                // 写完数据后,要把写事件取消,否则当写缓冲区有剩余空间时,会一直触发写事件
                selectionKey.interestOps(selectionKey.interestOps() - SelectionKey.OP_WRITE);
                // socketChannel.shutdownOutput(); // channel调用shutdownOutput()后,会停止触发写事件
            }
        }
    
        // 判断 SelectionKey 对某个事件是否感兴趣
        private static boolean isInterest(SelectionKey selectionKey, int event) {
            int interestSet = selectionKey.interestOps();
            boolean isInterest = (interestSet & event) == event;
            return isInterest;
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    在服务端读取完客户端消息后,会先判断当前SelectionKey是否已经注册了写事件,如果没有则为其注册写事件。在服务端向客户端写完数据后,会取消写事件。

    在上面判断SelectionKey是否已经注册了某个事件时,判断条件是 selectionKey.interestOps() & SelectionKey.OP_WRITE ,selectionKey.interestOps()就是已经注册的事件,SelectionKey中可以只用1个整形数字来表示多个注册的事件(interestOps变量),SelectionKey.OP_READ=1(二进制为 00000001),SelectionKey.OP_WRITE=4(二进制为 00000100),SelectionKey.OP_CONNECT=8(二进制为 00001000),SelectionKey.OP_ACCEPT=16(二进制为 00010000)。当注册某个事件时,会把对应事件对应的整数(单个事件对应的整数或者多个事件对应的整数和)赋值给interestOps变量,比如注册读事件时,interestOps=1;注册读事件+写事件时,interestOps=1+4 。因此可以通过按位与的算法来判断当前SelectionKey是否注册过某个事件。



    为什么不在处理完读事件(selectionKey.isReadable())后,直接写数据,而要重新注册一个读事件呢?

    那我们就继续看这个例子:

    @Slf4j
    public class NIOSelectorBlockingWriteServer {
    
        private final static int MESSAGE_LENGTH = 1024 * 1024 * 100;
    
        public static void main(String[] args) throws Exception {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080), 50);
            Selector selector = Selector.open();
            SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int count = selector.select();
                log.info("select event count:" + count);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    // 有客户端请求建立连接
                    if (selectionKey.isAcceptable()) {
                        handleAccept(selectionKey);
                    }
                    // 有客户端发送数据
                    else if (selectionKey.isReadable()) {
                        handleRead(selectionKey);
                    }
                    iterator.remove();
                }
            }
        }
    
        private static void handleAccept(SelectionKey selectionKey) throws IOException {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (Objects.nonNull(socketChannel)) {
                log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
                // 设置客户端Channel为非阻塞模式,否则在执行socketChannel.read()时会阻塞
                socketChannel.configureBlocking(false);
                Selector selector = selectionKey.selector();
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
        }
    
        private static void handleRead(SelectionKey selectionKey) throws IOException {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(MESSAGE_LENGTH);
            int length = 0;
            while (length < MESSAGE_LENGTH) {
                length += socketChannel.read(readBuffer);
            }
            log.info("receive message from client. client:{} message length:{}", socketChannel.getRemoteAddress(), readBuffer.position());
    
            ByteBuffer writeBuffer = ByteBuffer.allocate(readBuffer.position());
            readBuffer.flip();
            writeBuffer.put(readBuffer);
            writeBuffer.flip();
            while (writeBuffer.hasRemaining()) {
                int writeLength = socketChannel.write(writeBuffer);
                log.info("send message to client. client:{} message length:{}", socketChannel.getRemoteAddress(), writeLength);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • Debug模式启动服务端NIOSelectorBlockingWriteServer,在最后一行log.info("send message to client. client ……打上断点
    • Debug模式运行上面客户端NIOClient的例子,向服务端发送100MB的数据,在ByteBuffer readBuffer = ByteBuffer.allocate(MESSAGE_LENGTH);打上断点,运行到断点时,当服务端调用了socketChannel.write(writeBuffer);向客户端发送的数据还没来得及被客户端接收完时(这时候客户端卡在断点,并没有开始接收服务端的数据),数据会在服务端写缓冲区积压,在极限情况下,当服务写端缓冲区写满时,再调用socketChannel.write(writeBuffer);就写不进去了(返回0)。所以如果在处理完读事件后直接发送数据,遇到服务端写缓冲区满的情况时,会直接阻塞当前线程(比如这个例子中会不断执行int writeLength = socketChannel.write(writeBuffer);并返回0),无法及时处理其他客户端请求。
    • 这时再起一个线程,以Debug模式运行NIOClient,当执行socketChannel.write(writeBuffer);向服务端写数据时,可能会阻塞(如果发送的数据量太大导致服务端接收缓存写满,此时服务端线程又在死循环,所以无法读取客户端发来的数据),如果这里不阻塞,执行到socketChannel.read(readBuffer)就会阻塞(因为服务端一直没有读取到该客户端发送的数据,肯定也还没执行向客户端发送数据的代码)。这里的客户端是以阻塞模式运行的,即使把客户端的SocketChannel设置为非阻塞模式,也是无法及时收到服务端返回的数据的。

    所以在传输数据量较大,需要向客户端回写数据时,最好注册一个写事件,避免服务端缓冲区写满时导致线程阻塞,而无法及时处理其他事件的情况。这样才能体现NIO多路复用模式的特点,才能可以让一个线程同时为多个客户端服务。



    转载请注明出处——胡玉洋 《Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)》

  • 相关阅读:
    1052 Linked List Sorting
    闭包、闭包应用场景
    docker搭建hadoop集群 个人总结
    SpringSecurity前后端分离
    Ubuntu环境下基于libxl库文件使用C++实现对表格的操作
    剑指offer--重建二叉树
    【CSS】自定义进度条
    文心一言api开发者文档,python版ERNIE-3.5-8K-Preview模型调用方法
    【python笔记】第八节 函数基础
    [问题解决] no CUDA-capable device is detected
  • 原文地址:https://blog.csdn.net/huyuyang6688/article/details/126106949