• Netty源码分析:服务器启动跟踪,nioeventloop 剖析,accept、read 事件跟踪


    一、启动剖析

    Nio流程总览

    //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
    Selector selector = Selector.open(); 
    
    //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
    NioServerSocketChannel attachment = new NioServerSocketChannel();
    
    //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
    serverSocketChannel.configureBlocking(false);
    
    //4 启动 nio boss 线程执行接下来的操作
    
    //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
    
    //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
    
    //7 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(8080));
    
    //8 触发 channel active 事件,在 head 中关注 op_accept 事件
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    启动跟源码

    服务器入口 io.netty.bootstrap.ServerBootstrap#bind

    关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

    private ChannelFuture doBind(final SocketAddress localAddress) {
    	//1.执行 初始化
        //2.异步执行 注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        
        //已经完成:
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            //3.立刻调用 doBind0
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } 
        //没有完成:添加回调函数
        else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);//处理异常...
                    } else {
                        promise.registered();
    					//3.由注册线程去执行 doBind0:绑定端口号、触发 active 事件、注册accept事件
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

    //1.执行 初始化
    //2.异步执行 注册
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //1 初始化
            //1.1 创建 NioServerSocketChannel
            channel = channelFactory.newChannel();
            //1.2 给 NioServerSocketChannel 添加一个初始化器 ChannelInitializer
            init(channel);
        } catch (Throwable t) { // 处理异常...
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        //2 注册:将原生 channel 注册到 selector 上
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) { // 处理异常...
        }
        return regFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    关键代码 io.netty.bootstrap.ServerBootstrap#init

    //1.2 给 NioServerSocketChannel 添加一个初始化器 ChannelInitializer
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
    	
        //1.2 为 NioServerSocketChannel 添加初始化器:初始化器什么时候执行?
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                //1.2.1 初始化器的职责:将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

    • ServerBootStrap—>eventLoopGroup—>eventLoop—>Channel
    //2 注册:切换线程,并将原生 channel 注册到 selector 上
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 一些检查...
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) { //是否是EventLoop线程
            register0(promise);
        } else {
            try {
                //2.1 切换线程
                // 首次执行 execute 方法时:才启动 nio 线程,之后注册等操作在 nio 线程上执行
                // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        //2.2 将原生 channel 注册到 selector 上
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                // 日志记录...
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    io.netty.channel.AbstractChannel.AbstractUnsafe#register0

    //2.2 将原生 channel 注册到 selector 上
    private void register0(ChannelPromise promise) {
        try {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            // 2.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
            //this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // 2.2.2 执行 NioServerSocketChannel 初始化器的 initChannel,回到 1.2
            pipeline.invokeHandlerAddedIfNeeded();
    
            // 2.2.3 设置initAndRegister()执行结果,回到 3.绑定端口号、注册accept事件
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            
            if (isActive()) {// 对应 server socket channel 还未绑定,isActive 为 false
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

    //3.绑定端口号、触发 active 事件、注册accept事件
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

    //3.绑定端口号、触发 active 事件、注册accept事件
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
    
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// 记录日志...
        }
    
        boolean wasActive = isActive();
        try {
            //3.1 ServerSocketChannel 绑定端口
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    //3.2 触发 active 事件、注册accept事件
                    pipeline.fireChannelActive();
                }
            });
        }
    
        safeSetSuccess(promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

    //3.1 ServerSocketChannel 绑定端口
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

    //3.2 触发 active 事件、注册accept事件
    public void channelActive(ChannelHandlerContext ctx) {
        //3.2.1 触发 active 事件
        ctx.fireChannelActive(); 
    	//3.2.2 注册accept事件
        readIfIsAutoRead();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

    //3.2.2 注册accept事件
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    二、NioEventLoop 剖析

    NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),

    • selector:unwrappedSelector(Selector)、selector(SelectedSelectionKeySetSelector)
    • 线程:thread(由executor中唯一的线程赋值)、executor(Executor)
    • 任务队列:taskQueue(Queue)、scheduledTaskQueue(PriorityQueue>)

    1)何时创建selector

    • 在构造方法调用时创建
    /**
     * io.netty.channel.nio.NioEventLoop#NioEventLoop
     */
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            } else if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            } else {
                this.provider = selectorProvider;
                //在构造方法调用时创建:赋值给unwrappedSelector
                NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
                this.selector = selectorTuple.selector;
                this.unwrappedSelector = selectorTuple.unwrappedSelector;
                this.selectStrategy = strategy;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2)为什么会有两个selector

    • unwrappedSelector:原始的Selector,将 selectedKeys属性 的Set实现改为了数组(SelectedSelectionKeySet )的实现
    • selector:SelectedSelectionKeySetSelector 实例,内部包装了 unwrappedSelector 、SelectedSelectionKeySet 数组
    /**
     * io.netty.channel.nio.NioEventLoop#NioEventLoop
     */
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
        //1.获取两个Selector
        NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
        //2.赋值
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    	//1.获取两个Selector
    	private NioEventLoop.SelectorTuple openSelector() {
            //引用 unwrappedSelector 中的 selectedKeySet
            this.selectedKeys = selectedKeySet;
    	    //...改为了数组实现:数组实现可以提高遍历性能(原本为 HashSet)
    	    return new NioEventLoop.SelectorTuple(
    	        //原始的Selector
    	        unwrappedSelector, 
    	        //包装后的Selector
    	        new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)
    	    );
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3)thread何时启动

    • 首次调用 execute 方法时,将 executor 中唯一的线程赋值给 thread

    • 执行该线程任务,任务为死循环,不断查看是否有任务(调用selector.select(timeoutMills))

      /**
       * io.netty.util.concurrent.SingleThreadEventExecutor#execute
       */
      public void execute(Runnable task) 
          // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
          this.addTask(task);
      	//1.首次调用,启动线程
          this.startThread();
      	//2.添加任务后执行wakeup
      	if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
      	    this.wakeup(inEventLoop);
      	}
      }
      	//1.首次调用,启动线程
          private void startThread() {
              this.doStartThread();
          }
          	private void doStartThread() {
             		this.executor.execute(new Runnable() {
                      //1.将 executor 中唯一的线程赋值给 thread
                      SingleThreadEventExecutor.this.thread = Thread.currentThread();
                      //2.执行该线程的 run 方法,进入死循环
                      SingleThreadEventExecutor.this.run();
                  }     
          	}
          				//2.执行thread任务:执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
          				protected void run() {
          				    while(true) {
          				        while(true) {
          				            while(true) {
          				                try {
          				                    try {
          				                        switch() {
          				                        case -3:
          				                        case -1:
          				                            //1.调用select
          				                            this.select(this.wakenUp.getAndSet(false));
          				                            if (this.wakenUp.get()) {
          				                                this.selector.wakeup();
          				                            }
          				                            break;
          				                        case -2:
          				                            continue;
          				                        }
          				                    }
                                              //...执行任务
          				                } 
          				            }
          				        }
          				    }
          				}
      					    //1.调用select
      					    private void select(boolean oldWakenUp) throws IOException {
      					        try {
      					            while(true) {
      					                //2.阻塞
      					                int selectedKeys = selector.select(timeoutMillis);
      					            }
      					        } catch (CancelledKeyException var13) {}
      					    }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60

    4)普通任务会不会结束 select 阻塞

    • 会。非Nio线程每次调用 execute 方法后,会执行一次 wakeup

      /**
       * io.netty.util.concurrent.SingleThreadEventExecutor#execute
       */
      public void execute(Runnable task) 
          this.addTask(task);
          this.startThread();
      	//1.添加任务后执行wakeup
      	if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
      	    this.wakeup(inEventLoop);
      	}
      }
          protected void wakeup(boolean inEventLoop) {
              // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
              if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
                  this.selector.wakeup();
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

    5)普通任务 wakeup 理解

    /**
     * io.netty.channel.nio.NioEventLoop#wakeup
     */
    protected void wakeup(boolean inEventLoop) {
        //1.提交任务的线程不是Nio线程才会进入if块
        //2.保证多个非Nio线程同时提交任务后只唤醒一次
        if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
            this.selector.wakeup();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    6)thread什么时候 select

    • 没有任务时:返回-1,进入阻塞逻辑
    • 有任务时:调用 selectNow(返回0-…) 顺便拿到io 事件,执行任务
    /**
     * io.netty.channel.nio.NioEventLoop#run
     */
    //执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
    protected void run() {
        while(true) {
            while(true) {
                while(true) {
                    try {
                        try {
                            //1.当返回-1时进入阻塞逻辑
                            switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
                            case -3:
                            case -1:
                                //进入 select 逻辑
                                this.select(this.wakenUp.getAndSet(false));
                                if (this.wakenUp.get()) {
                                    this.selector.wakeup();
                                }
                                break;
                            case -2:
                                continue;
                            }
                        }
                        //...执行任务
                    } 
                }
            }
        }
    }
    	//1.没有任务时返回-1,进入阻塞逻辑
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : -1;
        } 
    		//2.有任务时调用 selectNow(返回0-...) 顺便拿到io 事件
        	private final IntSupplier selectNowSupplier = new IntSupplier() {
        	    public int get() throws Exception {
        	        return NioEventLoop.this.selectNow();
        	    }
        	};
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    7)select 阻塞多久

    • 超时时间:(1s+0.5ms)/1ms = 1000ms
    • 退出阻塞:到达截至时间(1s)、存在普通任务、发生io事件、被唤醒、被打断
    /**
     *	io.netty.channel.nio.NioEventLoop#select
     */
    //进入 select 逻辑
    private void select(boolean oldWakenUp) throws IOException {
        //1.获取当前时间
        long currentTimeNanos = System.nanoTime();
        //2.没有定时任务,截至时间:当前时间 + 1s
        //2.存在定时任务,截至时间:下一个定时任务执行时间 - 当前时间
        long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);
        //截至时间不变,当前时间改变
        while(true) {
            //3.超时时间:(1s+0.0005s)/1ms = 1000ms
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            //到达截至时间:退出阻塞
            if (timeoutMillis <= 0L) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            //有普通任务:退出阻塞。如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
            if (this.hasTasks() && this.wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            int selectedKeys = selector.select(timeoutMillis);
            //醒来后,有 IO 事件、非 EventLoop 线程唤醒、有任务:退出阻塞
            if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                        break;
            }
            
            long time = System.nanoTime();
            currentTimeNanos = time;
        }
    }
    	//2.获取截至时间
        protected long delayNanos(long currentTimeNanos) {
            ScheduledFutureTask<?> scheduledTask = this.peekScheduledTask(); //不考虑
            return scheduledTask == null ? SCHEDULE_PURGE_INTERVAL : scheduledTask.delayNanos(currentTimeNanos);
        }
    		static {
            	SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1L);
        	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    8)BUG解决:select空轮询

    • BUG解释:即 select 不阻塞(jdk 在linux中才会出现)
    • BUG解决:空轮询超过阈值(默认512),重建、替换旧的 selector,并退出阻塞
    /**
     *	io.netty.channel.nio.NioEventLoop#select
     */
    private void select(boolean oldWakenUp) throws IOException {
        try {
            int selectCnt = 0;
            //1.循环+阻塞:如果出现bug没阻塞即空轮询,则 selectCnt++
            while(true) {
                int selectedKeys = selector.select(timeoutMillis);
                ++selectCnt;
                //2.selectCnt超出阈值:重建并替换旧的 selector,退出阻塞
                else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    selector = this.selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
            }
        } 
    }
    static {
        //3.阈值默认值:512
        int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
        if (selectorAutoRebuildThreshold < 3) {
            selectorAutoRebuildThreshold = 0;
        }
        SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    9)thread 执行任务

    • 有多少任务执行多少任务
    • 按时间比例执行任务
      • 各占50%:io任务执行多久,普通任务就执行多久
    /**
     *	io.netty.channel.nio.NioEventLoop#run
     */
    //设置执行io任务的时间比例50%
    private volatile int ioRatio = 50;
    //执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
    protected void run() {
        while(true) {
            while(true) {
                while(true) {
                    try {
                        int ioRatio = this.ioRatio;
                        //1.比例设置为 100:则时间分配无效,该次循环存在多少任务执行多少任务
                        if (ioRatio == 100) {
                            try {
                                this.processSelectedKeys();
                            } finally {
                                this.runAllTasks();
                            }
                        } 
                        //2.按时间比例执行任务
                        else {
                            long ioStartTime = System.nanoTime();
                            boolean var14 = false;
                            try {
                                var14 = true;
                                this.processSelectedKeys();
                                var14 = false;
                            } finally {
                                if (var14) {
                                    long ioTime = System.nanoTime() - ioStartTime;this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
                                }
                            }
                            //2.1 获取 io任务 执行所用时间
                            //2.2 执行普通任务:所用时间与 io任务 相同
                            long ioTime = System.nanoTime() - ioStartTime;
                            this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
                        }
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    10)区分不同事件

    /**
     *	io.netty.channel.nio.NioEventLoop#run
     */
    //1.执行io任务
    protected void run() {
        while(true) {
            while(true) {
                while(true) {
                    try {
                        else {
                            try {
                                this.processSelectedKeys();
                            }
                        }
                    }
                }
            }
        }
    }
        private void processSelectedKeys() {
            //2.如果selectedKeySet已替换为数组实现
            //数组实现可以提高遍历性能(原本为 HashSet)
            if (this.selectedKeys != null) {
                this.processSelectedKeysOptimized();
            }
        }
        	private void processSelectedKeysOptimized() {
        	    for(int i = 0; i < this.selectedKeys.size; ++i) {
        	        SelectionKey k = this.selectedKeys.keys[i];
        	        this.selectedKeys.keys[i] = null;
        	        //3.获得事件相关的 Channel
        	        Object a = k.attachment();
        	        if (a instanceof AbstractNioChannel) {
        	            this.processSelectedKey(k, (AbstractNioChannel)a);
        	        } 
        	    }
        	}
    			//4.根据事件类型执行任务
        		private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        		    NioUnsafe unsafe = ch.unsafe();
        		    if (!k.isValid()) {}
        		    //key有效
        		    else {
        		        try {
        		            int readyOps = k.readyOps();
                            //连接事件
        		            if ((readyOps & 8) != 0) {
        		                int ops = k.interestOps();
        		                ops &= -9;
        		                k.interestOps(ops);
        		                unsafe.finishConnect();
        		            }
                            //可写事件
        		            if ((readyOps & 4) != 0) {
        		                ch.unsafe().forceFlush();
        		            }
                            //可读、可接入事件
        		            if ((readyOps & 17) != 0 || readyOps == 0) {
                                // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
                				// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
        		                unsafe.read();
        		            }
        		        }
        		    }
        		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    ⚠️ 注意

    这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:

    • 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
    • 由 EventLoop 自己调用,本次的 wakeup 会取消下一次的 select 操作

    在这里插入图片描述

    三、accept 剖析

    nio 流程总览

    //1 阻塞直到事件发生
    selector.select();
    
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {    
        //2 拿到一个事件
        SelectionKey key = iter.next();
        
        //3 如果是 accept 事件
        if (key.isAcceptable()) {
            
            //4 执行 accept
            SocketChannel channel = serverSocketChannel.accept();
            channel.configureBlocking(false);
            
            //5 关注 read 事件
            channel.register(selector, SelectionKey.OP_READ);
        }
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    启动跟源码

    服务器入口io.netty.channel.nio.NioEventLoop#processSelectedKey

    /**
     * io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
     */
    public void read() {
        try {
            try {
                do {
                    //1.ServerScoketChannel 执行 accept 创建 SocketChannel
    				//2.将 SocketChannel 包装为 NioSocketChannel、设置非阻塞,然后将 SocketChannel 作为消息放入 readBuf
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
    				// localRead 为 1,就一条消息,即接收一个客户端连接
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
    
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //3.进入 NioServerSocketChannel 的流水线:
                // 触发 read 事件,让 pipeline 上的 handler 处理
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
    
            if (exception != null) {
                closed = closeOnReadError(exception);
    
                pipeline.fireExceptionCaught(exception);
            }
    
            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
    	//1.
    	//2.
    	protected int doReadMessages(List<Object> buf) throws Exception {
            //1.ServerScoketChannel 执行 accept 创建 SocketChannel
            SocketChannel ch = SocketUtils.accept(this.javaChannel());
            try {
                if (ch != null) {
                    //2.将 SocketChannel 包装为 NioSocketChannel、设置非阻塞,然后将 SocketChannel 作为消息放入 readBuf
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            }
            return 0;
        }
    		//1.ServerScoketChannel 执行 accept 创建 SocketChannel
        	public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        	    try {
        	        return (SocketChannel)AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
        	            public SocketChannel run() throws IOException {
        	                return serverSocketChannel.accept();
        	            }
        	        });
        	    }
        	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

    //3.进入 NioServerSocketChannel 的流水线
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        
        final Channel child = (Channel) msg; // 这时的 msg 是 NioSocketChannel
    
        child.pipeline().addLast(childHandler); // NioSocketChannel 添加  childHandler 即初始化器
    
        setChannelOptions(child, childOptions, logger); // 设置选项
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
            //4.将 NioSocketChannel 注册到新的 NioEventLoop 线程中
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    又回到启动剖析中熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

    //4.切换线程:异步将 NioSocketChannel 注册到新的 NioEventLoop 线程中
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 一些检查...
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) { //是否是EventLoop线程
            register0(promise);
        } else {
            try {
                //4.1 切换线程:这行代码完成的事实是 nio boss -> nio worker 线程的切换
                // 首次执行 execute 方法时:才启动 nio 线程,之后注册等操作在 nio 线程上执行
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        //4.2 将 NioSocketChannel 注册到新的 NioEventLoop 线程中
                        register0(promise);
                    }
                });
            }
            //...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    io.netty.channel.AbstractChannel.AbstractUnsafe#register0

    //4.2 将 NioSocketChannel 注册到新的 NioEventLoop 线程中
    private void register0(ChannelPromise promise) {
        try {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            // 4.2.1 将 NioSocketChannel 注册到新的 selector 上
            // 注意此时没有注册 selector 关注事件,附件为当前的 NioSocketChannel
            // this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
            doRegister();
            neverRegistered = false;
            registered = true;
    		
            //4.2.2 执行初始化器:我们给NioSocketChannel写的 chileHandler—>initChannel
            //执行前 pipeline 中只有 head -> 初始化器 -> tail
            pipeline.invokeHandlerAddedIfNeeded();
            //执行后就是 head -> logging handler -> tail
    
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            
            if (isActive()) {
                if (firstRegistration) {
                    //4.2.3 在新的 selector 上关注 read 事件
                    // 触发 pipeline 上 active 事件
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    	//4.2.2 执行初始化器:我们给NioSocketChannel写的 chileHandler—>initChannel
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup(2);
            try {
                Channel channel = new ServerBootstrap()
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel channel) throws Exception {
                                //添加日志处理器
                                channel.pipeline().addLast(new LoggingHandler());
                            }
                        })
                    //...
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

    //4.2.3 在新的 selector 上关注 read 事件
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();
        //关注 read 事件(NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
        this.readIfIsAutoRead();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    io.netty.channel.nio.AbstractNioChannel#doBeginRead

    //关注 read 事件
    protected void doBeginRead() throws Exception {
        SelectionKey selectionKey = this.selectionKey;
        if (selectionKey.isValid()) {
            this.readPending = true;
            int interestOps = selectionKey.interestOps();//这时候 interestOps 是 0
            if ((interestOps & this.readInterestOp) == 0) {
                //关注 read 事件
                selectionKey.interestOps(interestOps | this.readInterestOp);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    四、read 剖析

    再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        //1.获取 byteBuf 分配器:决定是池化还是非池化的
        // io.netty.allocator.type 决定 allocator 的实现
        final ByteBufAllocator allocator = config.getAllocator();
        //2.动态调整 byteBuf 的分配大小,并且强制使用直接内存
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        a llocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                //3.读取到 byteBuf
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                //4.触发 read 事件,把 ByteBuf 依次传给流水线中的handler 处理,这时是处理 NioSocketChannel 上的 handler
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } 
            //5.是否要继续循环
            while (allocHandle.continueReading());
    
            allocHandle.readComplete();
            // 触发 read complete 事件
            pipeline.fireChannelReadComplete();
    
            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

    //5.是否要继续循环
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
        return 
               // 一般为 true
               config.isAutoRead() &&
               // respectMaybeMoreData 默认为 true
               // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
               (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
               // 小于最大次数,maxMessagePerRead 默认 16
               totalMessages < maxMessagePerRead &&
               // 实际读到了数据
               totalBytesRead > 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    java-net-php-python-ssm担保系统项目管理系统计算机毕业设计程序
    Linux Shell脚本的10个有用的“面试问题和解答”
    华为云云耀云服务器L实例评测|使用宝塔10分钟部署一个围猫猫小游戏
    stable-diffusion-webui中stability的sdv1.5和sdxl模型结构config对比
    第五届太原理工大学程序设计竞赛新生赛(初赛)题解
    康耐视深度学习ViDi-Plugins菜单介绍
    flowable,199boge,进阶,06完成
    shell脚本学习笔记02(小滴课堂)
    1612A无线信道仿真器30MHz~6GHz
    【力扣 Hot100 | 第七天】4.22(找到字符串中所有字母异位词)
  • 原文地址:https://blog.csdn.net/weixin_43401592/article/details/127721543