• Netty 之 NioEventLoop 源码阅读


    1. 概述

    EventExecutorGroup 继承了 ScheduledExecutorService、AbstractExecutorService、Iterable;
    根据这些名字,大概知道可以提供这些功能 :

    • 定时调度线程池定时执行任务
    • 线程池异步提交执行任务
    • 迭代器

    NioEventLoop 继承了 SingleThreadEventLoop,继承了SingleThreadEventExecutor, 是一个单线程事件处理器,单线程事件调度线程池;常用方法:

    • NioEventLoop#run 接受处理selector 上的I/O事件,处理普通任务、定时任务
    • 注册channel:io.netty.channel.SingleThreadEventLoop#register
    • 执行任务:io.netty.util.concurrent.SingleThreadEventExecutor#execute
    • 每个EventLoop都有自己的Selector: private Selector selector; private Selector unwrappedSelector;

    image.png

    2. EventExecutorGroup实例

    import io.netty.channel.DefaultEventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.EventExecutor;
    
    import java.util.concurrent.TimeUnit;
    
    public class EventLoopGroupStudy {
        public static void main(String[] args) {
    
            DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
            System.out.println(group.next());
            System.out.println(group.next());
            System.out.println(group.next());
    
            DefaultEventLoopGroup group1 = new DefaultEventLoopGroup(2);
            for (EventExecutor eventLoop : group1) {
                System.out.println(eventLoop);
            }
    
            NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
            nioWorkers.execute(()->{
                System.out.println("normal task...");
            });
    
            nioWorkers.scheduleAtFixedRate(() -> {
                System.out.println("scheduleAtFixedRate running...");
            }, 0, 1, TimeUnit.SECONDS);
        }
    }
    
    

    运行截图:
    image.png

    在这里插入图片描述

    3. 源码

    3.1 构造函数

    3.1.1 DefaultEventLoopGroup 构造函数

    • 可以指定 线程数量 nThreads、线程工厂 threadFactory
    • children = new EventExecutor[nThreads]; children[i] = newChild(executor, args); -> new NioEventLoop
    • 没有指定 线程数量 nThreads,默认 NettyRuntime.availableProcessors() * 2 ,cpu核心乘2的线程数,或者 io.netty.eventLoopThreads 配置的数量,最小是1
    public DefaultEventLoopGroup(int nThreads) {
        this(nThreads, (ThreadFactory) null);
    }
    
    public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        super(nThreads, threadFactory);
    }
    
    //  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
    //			"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    
        chooser = chooserFactory.newChooser(children);
    
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
    
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    

    3.1.2 NioEventLoop 构造函数

    • newTaskQueue 创建任务队列 Math.max(16, SystemPropertyUtil.getInt(“io.netty.eventLoop.maxPendingTasks”, Integer.MAX_VALUE));
    • 创建 NioEventLoop 创建对应的 selector: openSelector()
    • args 从创建 MultithreadEventExecutorGroup 的时候带进来
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                              ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
    
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // 创建 selector
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    3.2 run 方法

    • 轮询 I/O 事件: select(wakenUp.getAndSet(false));

    轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件

    • 处理 I/O 事件: processSelectedKeys(); 处理已经准备就绪的 I/O 事件;
    • 处理完 I/O 事件,再处理异步任务队列: runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

    ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

    在这里插入图片描述

    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            
                        case SelectStrategy.SELECT:
                            // 轮询 I/O 事件: select(wakenUp.getAndSet(false));
                            // 轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 处理 I/O 事件: processSelectedKeys(); 处理已经准备就绪的 I/O 事件;
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 处理完 I/O 事件,再处理异步任务队列: runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 
                        // ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    3.2.1 SelectStrategy

    SelectStrategy 定义:
    int SELECT = -1;
    int CONTINUE = -2;
    int BUSY_WAIT = -3;

    • 没有任务返回 SelectStrategy.SELECT
    • selectNowSupplier selectNow() 异常返回-1 也是 SelectStrategy.SELECT
    • selectNowSupplier selectNow() 异常返回0 或者 数量,走到 default
    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
            continue;
        case SelectStrategy.BUSY_WAIT:
        case SelectStrategy.SELECT:
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
        default:
    }
    
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    

    3.2.2 select

    • delayNanos(currentTimeNanos) 没有调度任务返回1s,有调度任务返回调度任务的待执行时间
      • 0.5ms内有定时任务需要执行,退出无限循环 : long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
      • Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。
    • 有待执行任务,wakenUp设置为true,selectNow然后结束
      • hasTasks() && wakenUp.compareAndSet(false, true)
    • 阻塞等待获取I/O事件
      • int selectedKeys = selector.select(timeoutMillis);
      • select次数 selectCnt++
    • selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks() 都停止 select
      • selectedKeys != 0:有io事件待处理
      • 参数oldWakenUp为true
      • wakenUp为true
      • hasTasks 有待执行任务
      • hasScheduledTasks 有待调度执行的任务
    • Thread.interrupted(), 线程被中断,break
    • selector.select(timeoutMillis) 真正等待时间 + 程序运行几毫秒 >= timeoutMillis 预计等待时间,说明selector没有问题,所以selectCnt重置为1
      • time [当前时间 = 开始时间 + 真正等待时间 + 程序运行几毫秒(忽略)] - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) [预计等待时间] >= currentTimeNanos [开始时间]

    ==> 真正等待时间 + 程序运行事件几毫秒(忽略) >= 预计等待时间

    • selectCnt 超过一定次数,可能触发了 epoll 空轮询 Bug,重新构建 selector
      • SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
      • 重新构建 selector,selector = selectRebuildSelector(selectCnt);
        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                // delayNanos(currentTimeNanos) 没有调度任务返回1s,有调度任务返回调度任务的待执行时间
                // Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,
                // 而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
                for (;;) {
                    // 0.5ms内有定时任务需要执行,退出无限循环
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // 2. 有待执行任务,wakenUp设置为true,selectNow结束
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    // 3. 阻塞等待获取I/O事件
                    int selectedKeys = selector.select(timeoutMillis);
                    // select次数
                    selectCnt ++;
    
                    // selectedKeys != 0:有i/o事件待处理
                    // select方法调用前wakenUp为true:selector 被唤醒
                    // wakenUp为true:selector 被唤醒
                    // hasTasks:有待执行任务
                    // hasScheduledTasks:有待调度执行的任务
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
    
                    // 线程被中断
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // 解决 JDK epoll 空轮训bug,selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD, 重新构建Selector
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                        selector = selectRebuildSelector(selectCnt);
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    
    

    3.2.3 processSelectedKeys

    处理已经就绪的 SelectionKey
    Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    3.2.3.1 processSelectedKeysOptimized

    用的 SelectedSelectionKeySet selectedKeys
    SelectedSelectionKeySet 内部使用的是 SelectionKey 数组,所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件,相比 JDK HashSet 的遍历效率更高

    private SelectedSelectionKeySet selectedKeys;
    
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;
    
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                // I/O事件由 Netty 负责处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                // 用户自定义任务
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
    
                selectAgain();
                i = -1;
            }
        }
    }
    

    3.2.3.2 processSelectedKeysPlain
    • 处理I/O事件
      • processSelectedKey(k, (AbstractNioChannel) a);
    • 处理NioTask
      • NioTask task = (NioTask) a;
      • processSelectedKey(k, task);
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
    
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (!i.hasNext()) {
                break;
            }
    
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
    
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    3.2.3.3 processSelectedKey

    NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, AbstractNioChannel)

    • k.isValid() 检查key是否有效
    • ((readyOps & SelectionKey.OP_CONNECT) != 0) :unsafe.finishConnect(); 接受连接
    • ((readyOps & SelectionKey.OP_WRITE) != 0): ch.unsafe().forceFlush(); 发送数据到客户端
    • ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0):unsafe.read(); 读取客户端数据
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    3.2.3.4 processSelectedKey NioTask

    NioTask 是用户自定义的 task

        private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
            int state = 0;
            try {
                task.channelReady(k.channel(), k);
                state = 1;
            } catch (Exception e) {
                k.cancel();
                invokeChannelUnregistered(task, k, e);
                state = 2;
            } finally {
                switch (state) {
                case 0:
                    k.cancel();
                    invokeChannelUnregistered(task, k, null);
                    break;
                case 1:
                    if (!k.isValid()) { // Cancelled by channelReady()
                        invokeChannelUnregistered(task, k, null);
                    }
                    break;
                }
            }
        }
    
    

    3.3 execute

    • addTask(task); 添加任务到 mpsc无锁队列
    • inEventLoop 如果为 false 表示由其它线程来调用 execute,启动线程,STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED),CAS保证不会重复启动
    • wakeup 唤醒 selector 的 select 阻塞
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) {
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    
        protected void addTask(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            if (!offerTask(task)) {
                reject(task);
            }
        }
    
       final boolean offerTask(Runnable task) {
            if (isShutdown()) {
                reject();
            }
            return taskQueue.offer(task);
        }
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    

    io.netty.channel.nio.NioEventLoop#wakeup

      protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }
        }
    

    3.4 schedule

    AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask)
    scheduledTaskQueue

        <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task);
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
    
            return task;
        }
    
        PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
            if (scheduledTaskQueue == null) {
                scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                        SCHEDULED_FUTURE_TASK_COMPARATOR,
                        // Use same initial capacity as java.util.PriorityQueue
                        11);
            }
            return scheduledTaskQueue;
        }
    

    3.5 NioEventLoop register

    • io.netty.channel.nio.NioEventLoop#register
    • io.netty.channel.nio.NioEventLoop#register0
    • java.nio.channels.spi.AbstractSelectableChannel#register 调用nio进行注册
      public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
            if (ch == null) {
                throw new NullPointerException("ch");
            }
            if (interestOps == 0) {
                throw new IllegalArgumentException("interestOps must be non-zero.");
            }
            if ((interestOps & ~ch.validOps()) != 0) {
                throw new IllegalArgumentException(
                        "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
            }
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            if (isShutdown()) {
                throw new IllegalStateException("event loop shut down");
            }
    
            if (inEventLoop()) {
                register0(ch, interestOps, task);
            } else {
                try {
                    // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
                    // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
                    submit(new Runnable() {
                        @Override
                        public void run() {
                            register0(ch, interestOps, task);
                        }
                    }).sync();
                } catch (InterruptedException ignore) {
                    // Even if interrupted we did schedule it so just mark the Thread as interrupted.
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
            try {
                ch.register(unwrappedSelector, interestOps, task);
            } catch (Exception e) {
                throw new EventLoopException("failed to register a channel", e);
            }
        }
    
        public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0)
                    throw new IllegalArgumentException();
                if (blocking)
                    throw new IllegalBlockingModeException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }
    

    3.6 runAllTasks

    • fetchFromScheduledTaskQueue : scheduledTaskQueue 移动到 taskQueue
    • pollTask() : taskQueue 取出任务
    • afterRunningAllTasks : taskQueue 为空,执行 tailTasks
    • safeExecute:try catch 的 执行 task,有异常不会终止
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    
    protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    3.7 wakenUp

    private final AtomicBoolean wakenUp = new AtomicBoolean();
    


    进入 select ,设置 wakenUp 为 false

    case SelectStrategy.SELECT:
        select(wakenUp.getAndSet(false));
    


    io.netty.channel.nio.NioEventLoop#wakeup

    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup(); 
        }
    }
    

    io.netty.util.concurrent.SingleThreadEventExecutor#execute
    提交任务会 wakeup(inEventLoop);

    public void execute(Runnable task) {
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop); 
        }
    }
    
  • 相关阅读:
    国产高云FPGA开发软件Gowin的下载、安装、Licence共享,按照我的方案保证立马能用,不能用你铲我耳屎
    VFP用Foxjson玩转JSON,超简单的教程
    猿创征文|【国产数据库实战】一文学会应用SqlSugar访问及操作人大金仓数据库
    入行10年后,我总结了这份FPGA学习路线
    mysql创建函数实现递归查下级
    如何下载GitHub上的代码
    跳槽至今0 offer的大冤种,问题到底出在哪儿?
    信创之国产浪潮电脑+统信UOS操作系统体验2:安装visual studio code和cmake搭建C++开发环镜
    记一次 MySQL timestamp 精度问题的排查 → 过程有点曲折
    如何解决工业相机达不到标称帧率
  • 原文地址:https://blog.csdn.net/qq_38737992/article/details/140087213