• Netty入门指南之NIO Selector监管


    作者简介:☕️大家好,我是Aomsir,一个爱折腾的开发者!
    个人主页Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客
    当前专栏Netty应用专栏_Aomsir的博客-CSDN博客

    参考文献

    前言

    在我们的上一篇文章中,我们详细讲解了如何使用NIO进行网络通信并成功解决了服务端的两次阻塞问题。这种解决方案有效地改善了通信效率。然而,引入非阻塞机制后,又产生了一个新的问题。我们注意到,在没有客户端请求和IO通信的情况下,上篇文章中的while循环会持续运行,导致CPU资源的浪费。更为复杂的是,我们的程序是单线程运行的,所有的请求接收和IO通信都由这一个线程处理,这无疑进一步拉低了CPU的利用率。

    问题解决

    如何解决

    为了解决这个问题,我们可以引入一个“监管者”,负责监控客户端的请求和IO通信。这个“监管者”会专注于监控ServerSocketChannel的ACCEPT状态,以及SocketChannel的READWRITE状态。只有当这些状态被触发时,"监管者"才会进行处理。在NIO中,我们有一个名为Selector的组件,它可以承担这个监管者的角色。

    实战编码

    现在,让我们通过实战编码来看看如何实现这个解决方案。通过引入Selector,我们成功地解决了while循环空转的问题,将阻塞的责任转交给了selector。这样,我们的程序就不会再发生阻塞了。我们的selector会监控ServerSocketChannel的ACCEPT事件,监控到了ACCEPT以后就会去获取对应的客户端SocketChannel,监控它的READ和WRITE事件。 请参考以下代码和相关注释进行理解。在接下来的内容中,我们会逐步详细解释这个过程。

    注意⚠️:它是一个单线程!

    public class MyServer2 {
        public static void main(String[] args) throws Exception{
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8000));
    
            // Selector只在非阻塞下可用
            serverSocketChannel.configureBlocking(false);
    
            // 引入监管者
            Selector selector = Selector.open();
    
            // 将ssc注册到selector上,返回一个SelectionKey,用于设置监控ACCEPT状态
            // SelectionKey: 将来事件发生后,通过它可以知道来自哪个Channel和哪个事件
            SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
            selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    
            // 监控
            while (true) {
    
                // 开始监控,此处会阻塞,直到监控到有客户端请求和实际的连接或读写操作才会继续往下执行
                // 监控到以后会将实际的ssc或者sc保存至 SelectionKeys(HashSet)里,然后放行
                selector.select();
    
                // 从监控到的SelectionKeys中获取到实际的
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
    
                    // 获取到后移除,防止重复处理
                    iterator.remove();
    
                    // 判断SelectionKey事件类型
                    if (key.isAcceptable()) {
                        // 获取到ssc从而获取到sc
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
    
                        // 将获取的sc注册到selector上,返回一个SelectionKey,用于设置监控READ状态
                        SelectionKey scKey = sc.register(selector, 0, null);
                        scKey.interestOps(SelectionKey.OP_READ);
    
                        System.out.println("accept = " + sc);
                    } else if (key.isReadable()) {
                        try {
                            // 通过SelectionKey获取到sc,然后读取数据
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(10);
                            int read = sc.read(buffer);
    
                            if (-1 == read) {
                                // 客户端处理完毕
                                key.cancel();
                            } else {
                                buffer.flip();
                                System.out.println("Charset.defaultCharset().decode(buffer).toString() = " + Charset.defaultCharset().decode(buffer));
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();
                        }
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    Selector详解

    Selector里面有很多的细节,我会带着一点点的去剖析,方便对整个程序有一个清晰的认知。

    keys&selectedKeys

    在Selector中,我们主要关注两种keys。

    第一种是keys,这是我们在将channel注册到selector的时候获取到的SelectionKey。这个key是在注册过程中获取的,而不是由selector在监控特定事件后获取。一旦channel注册成功,这个key就会被添加到keys列表中。这个key的主要作用是为特定的channel设置需要监控的事件。

    第二种是selectedKeys,这是我们在事件触发后,通过调用selectedKeys()方法获取的key,它是在事件触发后从keys列表中复制到selectedKeys列表中去的。这些selectedKeys对应的channel都是实际要发生事件的,例如ACCEPT、READ、WRITE等。所以说当我们从selectedKeys中取出一个key后要将其移出,以免出现异常

    总的来说,keys列表包含了所有注册的channel和其事件信息,这是一个较大的范围。而selectedKeys列表则是一个较小的范围,它来自于keys,只包含当前实际发生事件的channel。比如我开了个直播课,有100个人报名,这100个人在keys里,实际直播的时候有80人,这80人同时在selectedKeys里
    在这里插入图片描述

    select()方法

    Selector的select()方法是一个会产生阻塞的方法。它会定期轮询在Selector中注册的所有SelectionKey(也就是keys),并监控与这些key关联的Channel的状态,如果有对应事件发生(例如有新的连接请求,或者有数据可读/可写),则将对应的key添加到selectedKeys列表中,并放行,让程序处理这些事件。

    如果在调用select()方法时没有任何事件发生,那么该方法会阻塞,直到有事件发生为止。这样可以避免程序在没有任何事件发生时不断轮询,浪费CPU资源。

    如果服务端的buffer设置得太小,可能会导致服务端一次无法处理所有的数据。在这种情况下,当buffer被填满后,服务端会处理这第一部分数据,然后结束,因为这些未处理的数据会被视为新的事件。简言之就是说如果buffer需要两次才能读完客户端发送的一条数据,那这个channel会被selector监控到两次read事件

    代码问题及改进

    问题

    • 未处理半包与粘包问题,处理的过程中一段数据被分成了几个事件,但是每个buffer是独属某一个事件的,新的事件就是一个新的buffer,怎么解决?
    • 解决半包粘包后,如果buffer设置的小,从SocketChannel中读取的数据还没遇到\n,那buffer切换写模式压缩去等剩余数据写进来,等于白干,程序会被空转调用,怎么解决?
    • 服务端从SocketChannel已经读取完数据了,后续没有通信了,服务端没有去主动断开连接,那select岂不是每次轮询都得带着这些不会产生通信的keys?
    • 服务端没有处理异常

    解决

    • 对于第一个问题,我们可以用先前的doLineSpilt方法处理半包粘包,然后我们可以给每一个SocketChannel设置一个附件(att),在注册到selector的时候进行绑定,在处理其读写事件的时候取出来使用,这样粘包粘包压缩的数据就会一直都在了(只要key没有被删除,即channel没有断开,那就是同一个Channel)
    • 对于第二个问题,我们可以在处理半包粘包后,检查一下buffer的limit和position是否相等,如果在处理半包粘包后两者相等,说明buffer里是满的,这时我们创建新的buffer进行扩容,将新buffer作为附件绑定即可
    • 对于第三个问题,客户端和服务端达成协议,比如客户端不发数据代表通信结束,那服务端从channel读不出来数据(返回值为-1)时则调用SelectionKey的cancle方法,从keys中删除
    • 对于第四个问题:处理异常就可以了

    代码演示

    public class MyServer4 {
        public static void main(String[] args) throws Exception{
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8000));
    
            // Selector只在非阻塞下可用
            serverSocketChannel.configureBlocking(false);
    
            // 引入监管者
            Selector selector = Selector.open();
    
            // 让serverSocketChannel被selector管理,它只处理accept,所以附件为null
            SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
    
            // 监控accept
            selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    
            System.out.println("MyServer.main");
    
            // 监控
            while (true) {
                selector.select();
    
                System.out.println("------------111-------------");
    
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
    
                    // 取出来就从selectedKeys中删除
                    iterator.remove();
                    if (key.isAcceptable()) {
                        // ServerSocketChannel、获取的是最开始创建的,可以直接使用上面创建的
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
    
                        // 给每个SocketChannel绑定一个buffer,监控sc状态
                        ByteBuffer buffer = ByteBuffer.allocate(7);
                        SelectionKey scKey = sc.register(selector, 0, buffer);
                        scKey.interestOps(SelectionKey.OP_READ);
    
                        System.out.println("accept = " + sc);
                    } else if (key.isReadable()) {
                        try {
                            // 监控到key是读时间,获取到SocketChannel和buffer
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();   // 获取附件中的buffer
                            int read = sc.read(buffer);
    
                            if (-1 == read) {
                                // 客户端处理完毕
                                key.cancel();
                            } else {
                                doLineSplit(buffer);
    
                                // 没有压缩动,需要扩容
                                if (buffer.position() == buffer.limit()) {
                                    // 1、空间扩大
                                    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
    
                                    // 2、老缓冲区数据复制进新缓冲区
                                    buffer.flip();
                                    newBuffer.put(buffer);
    
                                    // 3、绑定channel
                                    // buffer = newBuffer;
                                    key.attach(newBuffer);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();
                        }
                    }
                }
            }
        }
    
        private static void doLineSplit(ByteBuffer buffer) {
            buffer.flip(); // 读模式
            for (int i = 0; i < buffer.limit(); i++) {
                if (buffer.get(i) == '\n') {
                    int length = i + 1 - buffer.position();  // 以免出现一行里面有多个\n
                    ByteBuffer target = ByteBuffer.allocate(length);
    
                    for (int j = 0; j < length; j++) {
                        target.put(buffer.get());
                    }
    
                    // 截取工作完成
                    target.flip();
                    System.out.println("StandardCharsets.UTF_8.decode(target) = " + StandardCharsets.UTF_8.decode(target));
    
                    target.clear();
                }
            }
    
            // 写模式(压缩)
            buffer.compact();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    拓展知识

    对于网络编程中常见的半包和粘包问题,我们有多种解决策略。一种简单且常用的方法是添加特定的标识符,如换行符\n,用于区分数据包的边界。另一种更为复杂但也更为精确的方法是采用类似HTTP协议的头体分离策略。在这种策略中,我们将数据分为头部和体部两部分。头部包含元数据信息,例如体部数据的大小等关键信息。体部则包含实际的数据内容。通过这种方式,我们可以清晰地区分每个数据包,从而有效解决半包和粘包问题。

    总结

    在今天的学习中,我们深入探讨了如何利用Java NIO的Selector来高效地监控我们的服务器端程序,从而避免无意义的空转。我们对Selector进行了深入剖析,透彻理解了其工作原理。进一步地,我们逐步优化了我们的程序,提高了其性能和效率。这一系列的学习和实践,为我们接下来的Netty学习铺设了坚实的基础。Netty,作为一个基于Java NIO的网络应用框架,我们对其的掌握将在未来的编程道路上发挥重要作用。

  • 相关阅读:
    TN、HTN、STN、FSTN、DSTN、CSTN、TFT、LCD 的区别
    Bootstrap学习(十一)
    系统软硬件
    抓包整理外篇——————autoResponder、composer 、statistics [ 三]
    【c++】四种类型转换的用法
    docker部署 spring-boot 项目,验证码获取报错的解决方法
    Vue路由(Vue Router)记录学习
    [实时流基础 flink] 窗口
    药品研发---崩解时限检查法检验原始记录模板
    一文1700字使用Postman搞定各种接口token实战(建议收藏)
  • 原文地址:https://blog.csdn.net/qq_43266723/article/details/134355283