• 基础 | NIO - [Selector]


    §1 概述

    多路复用

    • Selector 是 NIO 中的选择器,也称多路复用器
    • 用于监听 Channel状态
    • 可以实现用单线程管理多个 Channel
    • 不是所有 Channel 都可以被多路复用,需要继承 SelectableChannel
      • 所有的 SocketChannel 都继承了
      • FileChannel 没有继承

    注册

    • 注册通过 Channel.register(Selector selector, int ops) 完成
      • selector 是一个指定的选择器
      • opsselector 的关注行为
    • 可以在 Selector 上注册 Channel,二者的数量关系是多对多的
      • 一个 Selector 上可以注册多个 Channel(废话,否则选择个屁)
      • 一个 Channel 可以注册到多个选择器,并指定选择器的 关注行为(比如读或写)
      • 一个 Channel 不能在同一个 Selector 上注册多次
    • Channel 一旦注册就会 一直在选择器中存在

    取消注册
    通过选择键的 cancel()

    Channel 的状态与 Selector 的关注行为

    • 状态与 Selector 的关注行为一一对应,反映了 Channel 当前可以执行的行为
      Selector 的关注行为就是 选择器关注 Channel 是否处于对应的状态
    • 选择器的关注行为在SelectionKey.Key 中定义,包括
      • 读:SelectionKey.Key.OP_READ
      • 写:SelectionKey.Key.OP_WRITE,通道关闭也会触发此行为
      • 连接:SelectionKey.Key.OP_CONNECT
      • 接收:SelectionKey.Key.OP_ACCEPT
    • 同时关注多个行为时可以通过 位或 进行,如 SelectionKey.Key.OP_READ | SelectionKey.Key.OP_WRITE

    监听/选择

    • 选择器通过 select() 进行选择
    • 选择器在监听 Channel 时,只对注册的 关注行为 进行反应
      Channel 处于某状态,若此状态不被当前选择器关注,就不会进行任何动作
    • 当监听到关注行为时,选择器会将 Channel 放入选择键集合

    选择键

    • 选择键包含 SelectorChannel注册关系关注行为
    • 对不同的选择键路由到不同的操作是 NIO 编程的业务重点
    • 选择键的概念类似事件
      但基于事件的场景是 逻辑均基于事件触发,而选择键只是基于它进行路由
    • 可以通过选择键的 cancel() 方法注销选择器上的 Channel
      • cancel() 只会将当前选择键 加入需要被注销的选择键队列,随后自动在下一次 select() 时注销
      • 因此,触发一个选择键需要同时处理多个关注行为,否则应该 continue; 以触发下一轮选择
      • 注销不当会导致程序频繁抛出 CancelledKeyException

    §2 方法

    创建
    Selector.open()

    关闭
    close()

    • 会唤醒所有因选择 select() 而阻塞额线程
    • 会清空所有选择键
    • 会注销所有选择器中注册的 Channel
    • 不会关闭选择器中注册的 Channel


    注册
    Channel.register(Selector selector, int ops)

    • 注册到选择器的 Channel 必须处于非阻塞模式
      • 阻塞模式下向选择器注册会抛出 IllegalBlockingModeException
      • FileChannel 不能向选择器注册,因为没有非阻塞模式
    • 通道不需要保证注册所有关注行为
    • 可以通过 validOps() 获取 Channel 在指定选择器上注册的行为列表

    选择
    select()

    • 返回从上次调用此方法至今,已经就绪的 Channel 的数量
    • 已经就绪的不会在此方法返回,但会加入到选择键集合中
    • select() 一共有三种变种,横向对比见下表
      • select() 阻塞至至少一个 Channel 就绪
      • select(timeout) 限制最长阻塞时间
      • selectNow() ,不阻塞,立即获取

    停止选择
    wakeup()

    • 选择器执行选择时可能导致线程阻塞
    • 此方法可以使 select() 立即返回
      • 若选择器通过 select() 阻塞,此方法可以使它立即返回
      • 若选择器没有通过 select() 阻塞,此方法可以使它的下一次 select() 立即返回

    选择键
    selectKeys()
    获取现在所有已经就绪的选择键

    §3 使用

    案例说明
    ServerSocketChannel 多路复用监听端口
    SocketChannel 绑定 ServerSocketChannel 并发送信息

    代码
    server

    @Component
    public class SelectorHandlerDemo {
    
        public void handle(){
            try (
                    ServerSocketChannel server = ServerSocketChannel.open();
                    Selector selector = Selector.open();
            ){
                server.configureBlocking(false);
                server.bind(new InetSocketAddress(9999));
                server.register(selector, SelectionKey.OP_ACCEPT);
    
                SelectionKey key = null; // 每轮获取到的 keys 中的一个
                while(true){
                    if(0 == selector.selectNow()){
                        TimeUnit.MILLISECONDS.sleep(200);
                        continue;
                    }
    
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while(keys.hasNext()){
                        key = keys.next();
                        keys.remove();
    
                        if(!key.isValid())
                            key.cancel();
    
                        if(key.isAcceptable()) {
                            doAccept(key, selector);
                            continue;
                        }
                        if(key.isConnectable()){
                            doConnect(key,selector);
                            continue;
                        }
                        if(key.isReadable()){
                            doRead(key,selector);
                            continue;
                        }
                        if(key.isWritable()) {
                            doWrite(key, selector);
                            continue;
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        private void doValid(SelectionKey key, Selector selector) {}
    
        private void doWrite(SelectionKey key, Selector selector) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void doRead(SelectionKey key, Selector selector) {
            try {
                SelectableChannel selected = key.channel();
                if(!(selected instanceof SocketChannel)){
                    System.out.println("类型异常:" + key);
                    return;
                }
                SocketChannel socket = (SocketChannel) selected;
                ByteBuffer buf = ByteBuffer.allocate(10);
                List<byte[]> bufs = new ArrayList<>();
                for(int length=socket.read(buf);-1!=length;buf.clear(),length=socket.read(buf)){
                    buf.flip();
                    bufs.add(ArrayUtil.sub(buf.array(),0,Math.min(length,10)));
                }
                key.cancel();
                System.out.println(new String(ArrayUtil.addAll(bufs.toArray(new byte[][]{})), StandardCharsets.UTF_8));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void doConnect(SelectionKey key, Selector selector) {}
    
        private void doAccept(SelectionKey key, Selector selector) {
            try {
                SelectableChannel selected = key.channel();
                if(!(selected instanceof ServerSocketChannel)){
                    System.out.println("类型异常:" + key);
                    return;
                }
    
                ServerSocketChannel server = (ServerSocketChannel) selected;
                SocketChannel socket = null;
                socket = server.accept();
                socket.configureBlocking(false);
    
                socket.register(selector,SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    client 端

    @Component
    public class SelectorCallerDemo {
    
        public void call(){
            try (
                    SocketChannel channel = SocketChannel.open(new InetSocketAddress("192.168.3.7",9999))
            ){
                channel.configureBlocking(false);
                ByteBuffer buf = ByteBuffer.allocate(1024);
                buf.put(String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
                buf.flip();
                channel.write(buf);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    Express框架
    高级工技能等级认定理论部分 看了就过关
    【C# 基础精讲】Task和Task<T>的应用
    Java面试题:通过实例说明工厂模式和抽象工厂模式的用法,以及它们在解耦中的作用
    simplemde 下载问题
    hint: Updates were rejected because the tip of your current branch is behind
    【爬虫】爬虫基础
    Redis-数据结构及常用命令
    基于springboot接口的编写
    业绩不俗,毛利率下滑,股价接连下跌,片仔癀将向何处去?
  • 原文地址:https://blog.csdn.net/ZEUS00456/article/details/127080170