• 【Netty 源码分析】


    一、启动流程


    Netty启动流程可以简化成如下代码

    // netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
    Selector selector = Selector.open(); 
    
    // 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
    NioServerSocketChannel attachment = new NioServerSocketChannel();
    
    // 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
    serverSocketChannel.configureBlocking(false);
    
    // 启动 nio boss 线程执行接下来的操作
    
    //注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
    
    // head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
    
    // 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(8080));
    
    // 触发 channel active 事件,在 head 中关注 op_accept 事件
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 获得选择器Selector,Netty中使用NioEventloopGroup中的NioEventloop封装了线程和选择器
    • 创建NioServerSocketChannel,该Channel作为附件添加到ServerSocketChannel中
    • 创建ServerSocketChannel,将其设置为非阻塞模式,并注册到Selector中,此时未关注事件,但是添加了附件NioServerSocketChannel
    • 绑定端口
    • 通过interestOps设置感兴趣的事件

    ssc : 其实就是 ServerSocketChannel

    流程:

    • 1、init main
      • 创建NioServerSocketChannel main
        • 内部其实就是new出来,底层是一个newSocket过程
        • return provider.openServerSocketChannel();
      • add NioServerSocketChannel 初始化handler main
        • 初始化handler 等待调用 (main 未调用, nio-thread调用)

    向nio ssc 加入了accept handler (在accept事件发生后建立连接)

    • 2、register (切换线程)
      • 启动 nio boss 线程 main
      • 原生 ssc 注册到 selector 未关注事件 nio-thread
      • 执行 NioServerSocketChannel 初始化 handler nio-thread
    • 3、regFuture 等待回调 dobind0 nio-thread
      • 原生 ServerSocketChannel 绑定 nio-thread
      • 触发 NioServerSocketChannel active 事件 nio-thread

    bind
    选择器Selector的创建是在NioEventLoopGroup中完成的。
    NioServerSocketChannel 与 ServerSocketChannel的创建、ServerSocketChannel注册到Seletor中以及绑定操作都是由 bind 方法完成
    **服务器入口: ** io.netty.bootstrap.AbstractBootstrap#bind(int)

     /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    doBind

    真正完成初始化、注册以及绑定的方法是
    io.netty.bootstrap.AbstractBootstrap#doBind
    注意: dobind方法在主线程中执行

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 负责NioServerSocketChannel和ServerSocketChannel的创建
        // ServerSocketChannel的注册工作
        // init由main线程完成,regisetr由NIO线程完成
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        // 因为register操作是异步的
        // 所以要判断主线程执行到这里时,register操作是否已经执行完毕
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            
            // 执行doBind0绑定操作
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            // 如果register操作还没执行完,就会到这个分支中来
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            
            // 添加监听器,NIO线程异步进行doBind0操作
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • doBind()中有两个重要方法initAndRegister()和doBind0(regFuture, channel, localAddress, promise)
    • initAndRegister主要负责NioServerSocketChannel和ServerSocketChannel的创建(主线程中完成)与ServerSocketChannel注册(NIO线程中完成)工作
    • doBind0则负责连接的创建工作

    initAndRegisterd

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        
        // register promise.channel().unsafe().register(this, promise)方法
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    register:

    //promise.channel().unsafe().register(this, promise)方法
    
    @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ...
                
                AbstractChannel.this.eventLoop = eventLoop;
    
                // 此处完成了 main 到nio 线程的切换
                // 当前线程是否是nio线程
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        // 向nio线程中添加任务
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                // 该方法中会执行doRegister
                                // 执行真正的注册操作
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                      
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    register0

    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
                    
                    // 调用init中的initChannel方法  其实就是第一点中未执行的加handler
                    pipeline.invokeHandlerAddedIfNeeded();
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    doRegister方法

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // javaChannel()即为ServerSocketChannel
                // eventLoop().unwrappedSelector()获取eventLoop中的Selector
                // this为NIOServerSocketChannel,作为附件
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
               
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    doBind0

    绑定端口

    在doRegister和invokeHandlerAddedIfNeeded操作中的完成后,会调用safeSetSuccess(promise)方法,向Promise中设置执行成功的结果。此时doBind方法中由initAndRegister返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作

    // initAndRegister为异步方法,会返回ChannelFuture对象
    final ChannelFuture regFuture = initAndRegister();
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();
                // 如果没有异常,则执行绑定操作
                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    doBind0最底层调用的是ServerSocketChannel的bind方法
    NioServerSocketChannel.doBind方法
    通过该方法,绑定了对应的端口

    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            // 调用ServerSocketChannel的bind方法,绑定端口
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    关注事件

    在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    最终在AbstractNioChannel.doBeginRead方法中,会添加ServerSocketChannel添加Accept事件

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        // 如果ServerSocketChannel没有关注Accept事件
        if ((interestOps & readInterestOp) == 0) {
            // 则让其关注Accepet事件
            // readInterestOp 取值是 16
            // 在 NioServerSocketChannel 创建时初始化
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    总结

    通过上述步骤,完成了

    • NioServerSocketChannel与ServerSocketChannel的创建
    • ServerSocketChannel绑定到EventLoop的Selecot中,并添加NioServerSocketChannel附件
    • 绑定了对应的端口
    • 关注了Accept事件

    二、NioEventLoop剖析

    组成

    NioEventLoop 的重要组成部分有三个

    • Selector
    public final class NioEventLoop extends SingleThreadEventLoop {
        
        ...
            
        // selector中的selectedKeys是基于数组的
        // unwrappedSelector中的selectedKeys是基于HashSet的    
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • Thread 与 TaskQueue
    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        // 任务队列
        private final Queue<Runnable> taskQueue;
    
        // 线程
        private volatile Thread thread;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Selector 的创建

    Selector是在NioEventLoop的构造方法中被创建的

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
        
            ...
               
            // 初始化selector,初始化过程在openSelector中
            final SelectorTuple selectorTuple = openSelector();
            this.selector = selectorTuple.selector;
            this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    
    
    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            // 此处等同于 Selector.open()方法
            // 创建了unwrappedSelector对象
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    两个Selector

    NioEventLoop 中有selectorunwrappedSelector 两个selector,它们的区别主要在于SelectorKeys的数据结构.

    • selector中的SelectedKeys 是基于数组的.
    • unwrappedSelector中的是基于HashSet

    这样做的主要目的是,数组的遍历效率要高于HashSet

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
    
        ...
        
        // 获得基于数组的selectedKeySet实现
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    // 通过反射拿到unwrappedSelector中的selectedKeys属性
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                    ...
    	
                    // 暴力反射,修改私有属性
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
    
                    // 替换为基于数组的selectedKeys实现
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });
    
        selectedKeys = selectedKeySet;
        
        // 调用构造函数,创建unwrappedSelector与selector
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    NIO 线程启动时机

    启动

    NioEventLoop中的线程,在首次执行任务时,才会被创建,且只会被创建一次.
    代码;

    public class TestNioEventLoop {
        public static void main(String[] args) {
            EventLoop eventLoop = new NioEventLoopGroup().next();
            // 使用NioEventLoop执行任务
            eventLoop.execute(()->{
                System.out.println("hello");
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进入 execute 执行任务

    @Override
    public void execute(Runnable task) {
        // 检测传入的任务是否为空,为空会抛出NullPointerException
        ObjectUtil.checkNotNull(task, "task");
        // 执行任务
        // 此处判断了任务是否为懒加载任务,wakesUpForTask的返回值只会为true
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    进入上述代码的 execute 方法

    private void execute(Runnable task, boolean immediate) {
        // 判断当前线程是否为NIO线程
        // 判断方法为 return thread == this.thread;
        // this.thread即为NIO线程,首次执行任务时,其为null
        boolean inEventLoop = inEventLoop();
        
        // 向任务队列taskQueue中添加任务
        addTask(task);
        
        // 当前线程不是NIO线程,则进入if语句
        if (!inEventLoop) {
            // 启动NIO线程的核心方法
            startThread();
            
            ...
            
        }
    	
        // 有任务需要被执行时,唤醒阻塞的NIO线程
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    进入startThread方法

    private void startThread() {
        // 查看NIO线程状态是否为未启动
        // 该if代码块只会执行一次
        // state一开始的值就是ST_NOT_STARTED
        // private volatile int state = ST_NOT_STARTED;
        if (state == ST_NOT_STARTED) {
            // 通过原子属性更新器将状态更新为启动(ST_STARTED)
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    // 执行启动线程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    进入 doStartThread ,真正创建NIO线程并执行任务

    private void doStartThread() {
        assert thread == null;
        // 创建NIO线程并执行任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // thread即为NIO线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 执行内部run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
                
                ...
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    通过执行 SingleThreadEventExecutor.this.run(); 执行传入的任务 (task)
    该run方法是** NioEventLoop的run方法**

    @Override
    protected void run() {
        int selectCnt = 0;
        // 死循环,不断地从任务队列中获取各种任务来执行
        for (;;) {	
          	// 执行各种任务
       		try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
           		}
        	}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    唤醒

    NioEventLoop需要IO事件、普通任务以及定时任务,任务在run方法的for 循环中

    @Override
    protected void run() {
        int selectCnt = 0;
        // 死循环,不断地从任务队列中获取各种任务来执行
        for (;;) {
          	// 执行各种任务
       		...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    被执行,但该循环不会空转,执行到某些代码时,会被阻塞
    run方法中有 Select 分支

     case SelectStrategy.SELECT:
        select(wakenUp.getAndSet(false));
        if (wakenUp.get()) {
            selector.wakeup();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    会执行NioEventLoop 的 select 方法

    private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
                long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
                if (nextWakeupTime != normalizedDeadlineNanos) {
                    nextWakeupTime = normalizedDeadlineNanos;
                }
    
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                        selector = selectRebuildSelector(selectCnt);
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    但需要注意的是,select方法是会阻塞线程的,当没有IO事件,但有其他任务需要执行时,需要唤醒线程
    唤醒是通过execute最后的if代码块来完成的

    // 有任务需要被执行时,唤醒阻塞的NIO线程
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
    
    • 1
    • 2
    • 3
    • 4

    NioEventLoop.wakeup唤醒被selector.select方法阻塞的NIO线程

    @Override
    protected void wakeup(boolean inEventLoop) {
        // 只有当其他线程给当前NIO线程提交任务时(如执行execute),才会被唤醒
        // 通过AtomicLong进行更新,保证每次只能有一个线程唤醒成功
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            // 唤醒被selector.select方法阻塞的NIO线程
            selector.wakeup();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    SELECT分支

    run方法的switch语句有多条分支,具体执行分支的代码由strategy变量控制

    int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
    switch (strategy) {
    	...
    }
    
    • 1
    • 2
    • 3
    • 4

    strategy 的值 由 cacluateStrategy 方法确定

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        // selectSupplier.get() 底层是 selector.selectNow();
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    该方法会根据hasTaks变量判断任务队列中是否有任务

    • 若有任务,则通过selectSupplier获得strategy的值
      • get方法会selectNow方法,顺便拿到IO事件
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        public int get() throws Exception {
            return NioEventLoop.this.selectNow();
        }
    };
    
    int selectNow() throws IOException {
        return this.selector.selectNow();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 若没有任务,就会进入SELECT分支

    也就说,当任务队列中没有任务时,才会进入SELECT分支,让NIO线程阻塞,而不是空转。若有任务,则会通过get方法调用selector.selectNow方法,顺便拿到IO事件

    Java NIO空轮询BUG

    Java NIO空轮询BUG也就是JavaNIO在Linux系统下的epoll空轮询问题
    在NioEventLoop中,因为run方法中存在一个死循环,需要通过selector.select方法来阻塞线程。但是select方法因为BUG,可能无法阻塞线程,导致循环一直执行,使得CPU负载升高

    @Override
    protected void run() {
        ...
        for(;;){
            ...
            // 可能发生空轮询,无法阻塞NIO线程
            strategy = select(curDeadlineNanos);  
            ...     
        
         	if(...) {
    			...
         	} else if (unexpectedSelectorWakeup(selectCnt) ){
                // 通过unexpectedSelectorWakeup方法中的rebuildSelector重建selector
                // 并将selectCnt重置为0
                selectCnt = 0;
            }
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Netty中通过selectCnt变量来检测select方法是否发生空轮询BUG
    若发生空轮询BUG,那么selectCnt的值会增长是十分迅速。当selectCnt的值大于等于SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时,Netty则判断其出现了空轮询BUG,进行如下处理

    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);
        // 重建selector,将原selector的配置信息传给新selector
        // 再用新selector覆盖旧selector
        rebuildSelector();
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    通过rebuildSelector方法重建selector,将原selector的配置信息传给新selector,再用新selector覆盖旧selector。同时将selectCnt的值设置为0

    ioRatio

    NioEventLoop可以处理IO事件和其他任务。不同的操作所耗费的时间是不同的,想要控制NioEventLoop处理IO事件花费时间占执行所有操作的总时间的比例,需要通过ioRatio来控制
    NioEventLoop.run方法

    // 处理IO事件时间比例,默认为50%
    final int ioRatio = this.ioRatio;
    // 如果IO事件时间比例设置为100%
    if (ioRatio == 100) {
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            runAllTasks();
        }
    } else {
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {
            // 计算出处理其他任务的事件
            // 超过设定的时间后,将会停止任务的执行,会在下一次循环中再继续执行
            // Ensure we always run tasks.
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    处理事件

    IO事件是通过NioEventLoop.processSelectedKeys()方法处理的

    private void processSelectedKeys() {
        // 如果selectedKeys是基于数组的
        // 一般情况下都走这个分支
        if (selectedKeys != null) {
            // 处理各种IO事件
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            // 拿到SelectionKeyec
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;
    
            // 获取SelectionKey上的附件,即NioServerSocketChannel
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                // 处理事件,传入附件NioServerSocketChannel
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
    
                selectAgain();
                i = -1;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    三、accept剖析

    NIO中处理Accept事件流程

    NIO中处理Accept事件主要有以下六步

    • selector.select()阻塞线程,直到事件发生
    • 遍历selectionKeys
    • 获取一个key,判断事件类型是否为Accept

    • 创建SocketChannel,设置为非阻塞
    • 将SocketChannel注册到selector中
    • 关注selectionKeys的read事件

    代码如下

    // 阻塞直到事件发生
    selector.select();
    
    Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
    while (iter.hasNext()) {    
        // 拿到一个事件
        SelectionKey key = iter.next();
        
        // 如果是 accept 事件
        if (key.isAcceptable()) {
            
            // 执行accept,获得SocketChannel
            SocketChannel channel = serverSocketChannel.accept();
            channel.configureBlocking(false);
            
            // 将SocketChannel注册到selector中,并关注read事件
            channel.register(selector, SelectionKey.OP_READ);
        }
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    其中前三步,在NioEventLoop剖析中已经分析过了,所以接下来主要分析后三步
    发生Accept事件后,会执行NioEventLoop.run方法的如下if分支

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    	unsafe.read();
    }
    
    • 1
    • 2
    • 3

    NioMessageUnsafe.read方法

    public void read() {
    
        ...
        
        try {
            try {
                do {
    				// doReadMessages中执行了accept获得了SocketChannel
                    // 并创建NioSocketChannel作为消息放入readBuf
                    // readBuf是一个ArrayList用来缓存消息
                    // private final List<Object> readBuf = new ArrayList<Object>();
                    int localRead = doReadMessages(readBuf);
                    
                    ...
                    
    				// localRead值为1,就一条消息,即接收一个客户端连接
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
    
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                // 触发read事件,让pipeline上的handler处理
                // ServerBootstrapAcceptor.channelRead
                pipeline.fireChannelRead(readBuf.get(i));
            }
            
            ...
            
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    NioSocketChannel.doReadMessages方法
    该方法中处理accpet事件,获得SocketChannel,同时创建了NioSocketChannel,作为消息放在了readBuf中

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 处理accpet事件,获得SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());
    
        try {
            if (ch != null) {
                // 创建了NioSocketChannel,作为消息放在了readBuf中
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
           ...
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    ServerBootstrapAcceptor.channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 这时的msg是NioSocketChannel
        final Channel child = (Channel) msg;
    
        // NioSocketChannel添加childHandler,即初始化器
        child.pipeline().addLast(childHandler);
    
        // 设置选项
        setChannelOptions(child, childOptions, logger);
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
            // 注册 NioSocketChannel到nio worker线程,接下来的处理也移交至nio worker线程
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    通过AbstractUnsafe.register 方法,将SocketChannel注册到了Selector中,过程与启动流程中的Register过程类似

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        
        ...
    
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 这行代码完成的是nio boss -> nio worker线程的切换
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 真正的注册操作
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                ...
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    AbstractChannel.AbstractUnsafe.register0

    private void register0(ChannelPromise promise) {
        try {
            
            ...
                
            // 该方法将SocketChannel注册到Selector中
            doRegister();
            
            // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
            pipeline.invokeHandlerAddedIfNeeded();
            // 执行后就是 head -> logging handler -> my handler -> tail
    
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            
            if (isActive()) {
                if (firstRegistration) {
                    // 触发pipeline上active事件
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    AbstractNioChannel.doRegister将SocketChannel注册到Selector中

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将Selector注册到Selector中
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    HeadContext.channelActive

    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();
    	// 触发read(NioSocketChannel这里read只是为了触发channel的事件注册,还未涉及数据读取)
        readIfIsAutoRead();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    AbstractNioChannel.doBeginRead,通过该方法,SocketChannel关注了read事件

    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    	// 这时候 interestOps是0
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 关注read事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    四、read事件

    read事件的处理也是在

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    	unsafe.read();
    }
    
    • 1
    • 2
    • 3

    分支中,通过unsafe.read()方法处理的,不过此处调用的方法在AbstractNioByteChannel.NioByteUnsafe类中

    @Override
    public final void read() {
        // 获得Channel的配置
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
    	// 根据配置创建ByteBufAllocator(池化非池化、直接非直接内存)
    	final ByteBufAllocator allocator = config.getAllocator();
        // 用来分配 byteBuf,确定单次读取大小
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 创建ByteBuf
                byteBuf = allocHandle.allocate(allocator);
                // 读取内容,放入ByteBUf中
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 触发read 事件,让pipeline上的handler处理
                // 这时是处理NioSocketChannel上的handler
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } 
            // 是否要继续循环
            while (allocHandle.continueReading());
    
            allocHandle.readComplete();
            // 触发 read complete事件
            pipeline.fireChannelReadComplete();
    
            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
             // Check if there is a readPending which was not processed yet.
             // This could be for two reasons:
             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
             //
             // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)

    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
        return 
               // 一般为true
               config.isAutoRead() &&
               // respectMaybeMoreData默认为true
               // maybeMoreDataSupplier的逻辑是如果预期读取字节与实际读取字节相等,返回true
               (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
               // 小于最大次数,maxMessagePerRead默认16
               totalMessages < maxMessagePerRead &&
               // 实际读到了数据
               totalBytesRead > 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    企业数字化进程的关键一步
    grafana配置钉钉告警模版(一)
    【ict云赛道备考】华为云介绍
    怎么提取一个python文件中所有得函数名称
    低代码是个用处不大的“玩具”?可别小看它的威力!
    Springboot 玩一玩代码混淆,防止反编译代码泄露
    MVC 、MVP、MVVM
    web前端要接触的技术领域与关键要素
    Python分享之多进程探索 (multiprocessing包)
    2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷
  • 原文地址:https://blog.csdn.net/qq_41773026/article/details/125627918