• Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明


    Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明

    作者: Grey

    原文地址:

    博客园:Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明

    CSDN:Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明

    基于 JDK 的 API 自己实现 NIO 编程,需要一个线程池来不断监听端口。接收到新连接之后,这条连接上数据的读写会在另外一个线程池中进行。

    在 Netty 实现的服务端中, 有如下经典代码

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap b = new ServerBootstrap();
    // 设置服务端的线程模型。
    // bossGroup 负责不断接收新的连接,将新的连接交给 workerGroup 来处理。 
    b.group(bossGroup, workerGroup)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其中 bossGroup 对应的就是监听端口的线程池,在绑定一个端口的情况下,这个线程池里只有一个线程;workerGroup 对应的是连接的数据读写的线程。

    通过 debug 并设置断点的方式,我们来查看下创建 NioEventLoopGroup 的核心过程,

    在没有指定线程数的情况下new NioEventLoopGroup()会调用如下构造方法

        public NioEventLoopGroup() {
            this(0);
        }
    
    • 1
    • 2
    • 3

    即传入 0,然后一路跟下去,发现调用了MultithreadEventLoopGroup的如下逻辑

        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
    • 1
    • 2
    • 3

    由于我们传入的nThreads == 0,所以获取DEFAULT_EVENT_LOOP_THREADS的值,在MultithreadEventLoopGroup中,DEFAULT_EVENT_LOOP_THREADS的初始化逻辑如下

    private static final int DEFAULT_EVENT_LOOP_THREADS;
    
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    nThreads == 0的情况下,那么 NioEventLoopGroup 的默认线程的个数为 CPU 的核数乘以 2,即:NettyRuntime.availableProcessors() * 2

    继续跟下去,可以看到 NioEventLoopGroup 调用了如下的构造方法,其核心代码如下

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
     ……
     // 创建ThreadPerTaskExecutor:ThreadPerTaskExecutor表示每次调用execute()方法的时候,都会创建一个线程。
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    ……
    // 2.创建NioEventLoop:NioEventLoop对应线程池里线程的概念,这里其实就是用一个for循环创建的。
            children = new EventExecutor[nThreads];
    ……
            for (int i = 0; i < nThreads; i ++) {
                ……
                children[i] = newChild(executor, args);
                ……
            }
    
    // 3.创建线程选择器:线程选择器的作用是确定每次如何从线程池中选择一个线程,也就是每次如何从NioEventLoopGroup中选择一个NioEventLoop。
            chooser = chooserFactory.newChooser(children);
    
    ……
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这个构造方法包括了三个内容

    1. 创建 ThreadPerTaskExecutor:ThreadPerTaskExecutor 主要是用来创建线程

    2. 创建 NioEventLoop:NioEventLoop 对应线程池里线程的概念。

    3. 创建线程选择器:线程选择器的作用是确定每次如何从线程池中选择一个线程,也就是每次如何从 NioEventLoopGroup 中选择一个 NioEventLoop。

    首先,我们看 ThreadPerTaskExecutor 如何创建线程,核心代码如下

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    这里的 threadFactory 就是前面传入的newDefaultThreadFactory(),这个方法定义了默认线程的一些基本信息,一路追踪到DefaultThreadFactory

        public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
            ObjectUtil.checkNotNull(poolName, "poolName");
    
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                throw new IllegalArgumentException(
                        "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
            }
    
            prefix = poolName + '-' + poolId.incrementAndGet() + '-';
            this.daemon = daemon;
            this.priority = priority;
            this.threadGroup = threadGroup;
        }
    
    // 创建线程,将 JDK 的 Runnable 包装成 FastThreadLocalRunnable
            @Override
        public Thread newThread(Runnable r) {
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            try {
                if (t.isDaemon() != daemon) {
                    t.setDaemon(daemon);
                }
    
                if (t.getPriority() != priority) {
                    t.setPriority(priority);
                }
            } catch (Exception ignored) {
                // Doesn't matter even if failed to set.
            }
            return t;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    可以看到 Netty 的线程实体是由 ThreadPerTaskExecutor 创建的,ThreadPerTaskExecutor 每次执行 execute 的时候都会创建一个 FastThreadLocalThread 的线程实体。

    接下来是创建 NioEventLoop,Netty 使用 for 循环来创建 nThreads 个 NioEventLoop,通过前面的分析,我们可能已经猜到,一个NioEventLoop对应一个线程实体,即 Netty 自己封装的 FastThreadLocalThread。

    来到 NioEventLoop 的构造方法

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
            super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                    rejectedExecutionHandler);
           ......
            final SelectorTuple selectorTuple = openSelector();
            ......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    即创建了一个 Selector,Selector 是 NIO 编程里最核心的概念,一个 Selector 可以将多个连接绑定在一起,负责监听这些连接的读写事件,即多路复用。

    继续往上调用构造方法

        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
            ......
            this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
            ......
        }
    
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    NioEventLoop 重写了 taskQueue 的创建逻辑

        private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
            // This event loop never calls takeTask()
            return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
        }
    
        private static Queue<Runnable> newTaskQueue(
                EventLoopTaskQueueFactory queueFactory) {
            if (queueFactory == null) {
                return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
            }
            return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    即创建一个 MPSC 队列,

    MPSC 队列,Selector,NioEventLoop,这三者均为一对一关系。

    接下来是创建线程选择器,

    chooser = chooserFactory.newChooser(children);
    
    • 1

    这里的选择器是

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    
    • 1
    • 2
    • 3

    中的DefaultEventExecutorChooserFactory.INSTANCE,进入

        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Netty 通过判断 NioEventLoopGroup 中的 NioEventLoop 是否是2的幂来创建不同的线程选择器,不管是哪一种选择器,最终效果都是从第一个 NioEvenLoop 遍历到最后一个NioEventLoop,再从第一个开始,如此循环。GenericEventExecutorChooser 通过简单的累加取模来实现循环的逻辑,而 PowerOfTowEventExecutorChooser 是通过位运算实现的。

        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        ......
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        ......
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        ......
            @Override
            public EventExecutor next() {
                return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
            }
        ......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    最后总结一下,NioEventLoopGroup 的创建核心就三步

    1. 创建ThreadPerTaskExecutor;

    2. 创建NioEventLoop;

    3. 创建线程选择器。

    完整代码见:hello-netty

    本文所有图例见:processon: Netty学习笔记

    更多内容见:Netty专栏

    参考资料

    跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

    深度解析Netty源码

  • 相关阅读:
    JS力扣第九题-回文数
    leetcode算法每天一题 020: 有效的括号(c++ stack<> queue<>的简单使用 )
    解决Spring Boot应用在Kubernetes上健康检查接口返回OUT_OF_SERVICE的问题
    AndroidStudio编译错误‘android.injected.build.density‘ is deprecated
    Kotlin 开发Android app(六):Kotlin 中的空判断 问号和感叹号
    简单的Web前端数据字典实现,小型项目必备
    Nginx 简介、安装与配置文件详解
    Docker基础
    【C/C++】malloc/free 和 new/delete
    了解计算机
  • 原文地址:https://blog.csdn.net/hotonyhui/article/details/127134936