• netty源码系列之-02_EventLoopGroup和EventLoop


    概述

    我们知道netty的启动是需要实力话两个EventLoopGroup

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    • 1
    • 2

    EventLoopGroup 组

    在这里插入图片描述
    EventLoopGroup 继承EventExecutorGroup , EventExecutorGroup 就是一个线程池,继承了线程包中的ScheduledExecutorService,我们都知道,改接口有定时执行的功能
    它本身继承了4个重要接口,以及它们各自的功能:

    • Iterable:迭代器接口,返回一个迭代器。说明已经具备了Group的管理或者容器功能。
    • Executor:具备了提交任务的能力。
    • ExecutorService:具备了线程池的能力。
    • ScheduledExecutorService:具备了执行调度任务的能力。
      这4种能力不多说,都是JDK本身提供的,我们杰西莱看一下,它本身定义了哪些方法呢?如下图:

    它优化了JDK提供的关于中断的方法:标记过时(shutdown,shutdownNow),并且新加了几个方法

    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    
    • 1
    • 2
    • 3

    增加了一个中断状态监听的方法

    Future<?> terminationFuture()。注意这个Future是netty的.
    
    • 1

    可以在这个future上面增加一些监听器(addListener),然后当中断的时候就会触发。
    增加了next方法。返回的是由它管理的EventExecutor。这个组的味道已经出现了。这个也是后面实现轮询内部成员的一个关键方法。
    这个接口的功能就是:线程池,管理一组EventExecutor,调度任务,中断监听。

    EventLoopGroup 核心方法

    public interface EventLoopGroup extends EventExecutorGroup {
        /**
         * Return the next {@link EventLoop} to use
         * next 方法是把指针指向下一个线程,因为netty基于NIO,NIO之所以非阻塞,是因为用到了ForkJoinPool这种线程池,这种线程池
         * 支持线程任务分发然后合并,任务盗取等任务,有兴趣的同学可以去专门了解下java的并非变成ForkJoinPool
         * next 就是本线程线程绝对该任务可以发给下一个线程处理,这个过程就是netty的事件循环组的意义
         */
        @Override
        EventLoop next();
    
        /**
         * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
         * will get notified once the registration was complete.
         */
        ChannelFuture register(Channel channel);
    
        /**
         * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
         * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
         */
        ChannelFuture register(ChannelPromise promise);
    
        /**
         * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
         * will get notified once the registration was complete and also will get returned.
         *
         * @deprecated Use {@link #register(ChannelPromise)} instead.
         */
        @Deprecated
        ChannelFuture register(Channel channel, ChannelPromise promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    EventExecutorGroup

    在这里插入图片描述

    这个接口负责提供EventExecutor (事件执行器) ,用什么来提供,用next方法提供(指向下一个线程去执行),并且可以优雅的关闭.

    AbstractEventExecutorGroup

    这个类没有任何的实现,仅仅只是把所有的提交任务的方法做了一下默认实现:就是调用一些next()方法轮询出来一个Executor然后去执行,比如:

      @Override
        public void execute(Runnable command) {
            next().execute(command);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其它的所有方法都没有实现,也确实挺抽象的。
    不过这个抽象类的作用还是非常大的,之前在分析组和成员关系的时候,组也具备了一些成员关系的功能,但是它的执行就是通过成员去完成的,而且这些功能本身也确实不需要组去关心的。而netty就是把这些功能放在了这个类里面,也放的特别合适,它没有实现任何有关于的组的管理功能,实际上就是交给子类去完成了,而子类在实现管理功能的时候,也就不需要关心这些功能了(提交任务),父类都已经实现了。

    MultithreadEventExecutorGroup

    听名字就可以听出来一些味道了:多线程事件执行器组。
    3个关键词:多线程,事件执行器,组。
    这个里面才是真正实现了组的管理功能,看一下它内部所有的属性和方法:

    	private final EventExecutor[] children;
        private final Set<EventExecutor> readonlyChildren;
        private final AtomicInteger terminatedChildren;
        private final Promise<?> terminationFuture;
        private final EventExecutorChooser chooser;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    先看一下内部属性吧:

    • EventExecutor[] children:固定大小的EventExecutor数组,构造器里面会根据大小全部初始化成员。
    • EventExecutorChooser chooser:成员轮询器。虽然有两种实现,但其实都是按照(0,1,2,3,…,n-1)的顺序来的。只不过当大小是2的幂的时候,采用了一下位的和(&)运算,会稍微快一些。这个就是next方法的实现。
    • terminationFuture:中断的一个Future,可以用来监听中断事件。
    • readonlyChildren:用一个不可写的集合来做迭代器用。
    • terminatedChildren:原子int,用来标记当前有多少个成员已经被中断了,当所有的都被中断的时候,就会触发中断事件。
      这个类比较核心的就是它的构造器了,至于其它的关于中断的方法,基本上也都是循环中断所有的成员,不是特别复杂。当然还有一个创建成员的抽象方法,不过也是和构造器相呼应的。

    关于它的构造器和一个抽象方法,我觉得也是比较有趣的点。

    • 它的所有的构造器都是protected的,说明不是对外开放的,由子类去调用的。

    • 最有趣的在于构造器的最后一个参数居然是:Object… args。在构造器里面,这种写法确实还是比较少见的。

    • 这个类仅有一个抽象方法:

    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    
    • 1

    就是在初始化成员的时候需要调用的方法,而且它的最后一个参数也是 Object… args。其实构造器的对象数组的入参和这个入参是相呼应的,就是同一个

    • 这个构造器当时在定义的时候,开发人员估计费了不少的心思啊,哈哈。

    • 那为什么要这样写呢?那肯定是为了复用,复用这些组的管理功能。它所管理的Executor到底需要哪些参数,以及如何来创建。它不关心,它也没法关心;所以留到了子类,同时它希望对于具体的子类而已,那个对象数组不能暴露给框架的使用人员,希望他们具体化,不能这么抽象的使用,所以构造器全部protected。

    DefaultEventExecutorGroup

    这个类非常简单,提供了3个构造器,实现了newChild方法,并且明确了成员就是DefaultEventExecutor。
    当我学习到这边的时候,就基本非常明确了,它就是一个线程池,我第一个想到的就是和ThreadPoolExecutor进行比较,之前的文章以及比较过了。但是在比较之前,想了一下,我好像还并不知道任务是怎么执行的,也就是EventExecutor的具体实现了。

    EventExecutor 成员

    事件执行器EventExecutor 事件执行器,也是一个借口,其中一个抽象类
    AbstractScheduledEventExecutor ,支持了scheuduling(线程的定时调度,或者定时执行)

    SingleThreadEventExecutor 继承AbstractScheduledEventExecutor ,实现OrderedEventExecutor接口,任务可以顺心执行,
    其实我们看源码发现OrderedEventExecutor 就是一个空的接口,但是怎么保证线程能按照顺序执行,‘
    private final Queue taskQueue; 定义了一个taskQueue,

    netty提供的一个默认实现DefaultEventExecutor ,继承了SingleThreadEventExecutor, 里面的任务执行方法run
    在这里插入图片描述
    它继承了EventExecutorGroup,说明它也拥有上面说的所有功能。
    重点关注一下,它新定义的方法吧:
    在这里插入图片描述
    它基本上新定义了3类接口:

    • parent():返回父节点。也反映了它被EventExecutorGroup管理的特性。
    • inEventLoop():判断当前线程是不是当前EventExecutor所关联的事件循环的线程。非常重要的方法,好多地方都会用到。因为在后续的提交任务的时候,有可能是事件循环的线程(这个就是提交的任务在执行过程当中又提交了新的任务),有可能是其它线程。然后可能需要做一些线程安全方面的工作。
    • 创建Promise,或者Future的方法。这个我没有用过,但是也了解了一些,简单说一下,可能不太对。这两种对象都代表的是异步执行的结果,前者相对于后者具备了写的功能,后者只可读。但是他们都具备结束然后事件通知的能力,那么谁来通知呢?就是当前EventExector所关联的线程去通知。也就是通过它创建的,都会用它的关联线程去执行通知任务。但是具体的应用场景还不太清楚。
      以后就是它比Group多出来的方法,一方面体现了它是成员角色,另一方面它也可以做额外的事情。从下面开始,也就没有接口定义了,都是具体的类了。
     @Override
        protected void run() {
            for (;;) {
                Runnable task = takeTask();
                if (task != null) {
                    task.run();
                    updateLastExecutionTime();
                }
    
                if (confirmShutdown()) {
                    break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    执行方式,循环一直跑,只要taskTask()里面有任务,就拿出来执行,如果confirmShutdown()就跳出循环,这个地方大家一定理解是单线程,因为是run方法

    AbstractEventExecutor

    这个类一方面实现了EventExecutor,另一方也继承了JDK提供的AbstractExecutorService。后面这个类更多的是提供提交一些任务的默认实现,也没有做具体的业务实现。
    AbstractEventExecutor它本身也没有做太多事情,只是把接口和抽象类整合在了一起。稍微梳理一下吧:
    parent:定义了父对象。

    selfCollection:返回一个属于自己的迭代器。

    promise,future的方法都做了实现,如下:
    关于定时任务的,都直接抛了异常,不支持。

    提供了一个EventExecutorGroup的一个构造器。

    AbstractScheduledEventExecutor

    看这个名字基本也能猜出来,它实现了调度任务:
    内部有一个优先级队列,最早执行的会放在队首,每次提交任务的时候会进行调整。它这个不是线程安全的队列。因此它在添加调度任务的时候,如果不是事件循环线程的话,会提交一个新的普通任务取提交任务,保证线程安全:

    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduleFromEventLoop(task);
            } else {
                final long deadlineNanos = task.deadlineNanos();
                // task will add itself to scheduled task queue when run if not expired
                if (beforeScheduledTaskSubmitted(deadlineNanos)) {
                    execute(task);
                } else {
                    lazyExecute(task);
                    // Second hook after scheduling to facilitate race-avoidance
                    if (afterScheduledTaskSubmitted(deadlineNanos)) {
                        execute(WAKEUP_TASK);
                    }
                }
            }
    
            return task;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    所有被提交的调度任务都会被封装成ScheduledFutureTask,这个对象里面有几个关键点:

    当这个类被内存加载的时候,会生成当前时间的一个纳秒时间戳。以后所有的时间计算都会以它作为开始时间的标准,比如获取当前纳米戳=获取当前纳秒时间戳的然后减去开始时间就是当前时间:

    private static final long START_TIME = System.nanoTime();
    
        static long nanoTime() {
            return System.nanoTime() - START_TIME;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • deadlineNanos:这个任务应该被执行的纳秒时间戳。都是通过比较这个属性值来判断当前任务是不是到时间去执行了。

    • periodNanos:调度任务的执行周期。为0的话,代表只执行一次。否则,就认为是周期不断执行的任务,间隔就是periodNanos。

    • 关键在于它的run方法,解决了如何执行周期任务。当一个周期任务执行完了,它会把periodNanos加到deadlineNanos上面取,作为新的执行时间,然后重新加入队列。然后就可以再执行了:

     @Override
        public void run() {
            assert executor().inEventLoop();
            try {
                if (delayNanos() > 0L) {
                    // Not yet expired, need to add or remove from queue
                    if (isCancelled()) {
                        scheduledExecutor().scheduledTaskQueue().removeTyped(this);
                    } else {
                        scheduledExecutor().scheduleFromEventLoop(this);
                    }
                    return;
                }
                if (periodNanos == 0) {
                    if (setUncancellableInternal()) {
                        V result = runTask();
                        setSuccessInternal(result);
                    }
                } else {
                    // check if is done as it may was cancelled
                    if (!isCancelled()) {
                        runTask();
                        if (!executor().isShutdown()) {
                            if (periodNanos > 0) {
                                deadlineNanos += periodNanos;
                            } else {
                                deadlineNanos = nanoTime() - periodNanos;
                            }
                            if (!isCancelled()) {
                                scheduledExecutor().scheduledTaskQueue().add(this);
                            }
                        }
                    }
                }
            } catch (Throwable cause) {
                setFailureInternal(cause);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 同时它也提供了一些取出到了执行时间任务的一些方法,供子类使用。

    SingleThreadEventExecutor

    单线程事件执行器,意思以及很明确了,用一个线程去执行所有的事件。
    这个类的属性和方法有点多,生成的图不好看,IDEA也不能删除其中几个。那就没有图了,调几个重要的:

    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    简单说一下这几个属性吧:

    • taskQueue:任务队列,所有提交的任务会执行扔到这个队列里面。注意它的定义并不是阻塞队列。但是它创建的时候,提供的默认实现就是阻塞队列,并且它提供的一个内部方法takeTask(),取任务的时候还强制要求必须是阻塞队列,否则就抛异常。刚开始还挺郁闷的,想不明白。最后发现创建队列的方法被NIOEventLoop给重写了,它那边提供的是一个无锁高性能队列MPSC(多生茶这,单消费者,这就是它的消费模型)队列,这个并不是阻塞队列。这或许解释得通?
    • thread:事件循环的线程。会在线程启动以后赋值这个变量,它是用Executor启动的,而不是直接一个线程。
    • addTaskWakesUp:如果为true,意味着:当且仅当执行addTask(Runnable)方法的时候,会唤醒执行器的线程。我对这个变量没有搞太明白,它这边提供的构造器默认是true,而EventLoop那边变成了false,而且我很明确它为false的时候提交一个新任务进来才可以唤醒正在阻塞轮询的线程。它说明中的addTask方法,也是仅仅是提交任务的时候会去触发:
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
    
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    重点可以看一下后面两行,它必须为true,才会去执行wakeUp方法。所以对于它的描述,实现是看不明白。不解释了,有明白的可以分享一下。

    • maxPendingTasks:队列的最大容量。
      从功能上来讲,这个类主要做了这么几件事情:

    • 任务队的创建。

    • 循环线程的启动,当提交第一个任务的时候就会启动线程:

    private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    try {
                        doStartThread();
                    } catch (Throwable cause) {
                        STATE_UPDATER.set(this, ST_NOT_STARTED);
                        PlatformDependent.throwException(cause);
                    }
                }
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    }
                   //其它的省略一下
               }
            });
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    启动以后,它重点就做了一个事情,执行run方法。这是个抽象方法。也就是真正的循环体这边也没有实现。

    任务的提交。实现了execute方法,这也是非常关键的方法。
    提供了一些从队列里面取任务的方法,供子类用,毕竟它定义队列成private了,比如:

    • Runnable pollTask()
    • Runnable pollTaskFrom(Queue taskQueue)
    • Runnable takeTask()
    • boolean fetchFromScheduledTaskQueue()
    • Runnable peekTask()
      还提供了一些执行任务的方法,比如:
    • boolean runAllTasks()
    • runAllTasksFrom(Queue taskQueue)
    • runAllTasks(long timeoutNanos)
      它的职责就这些,该做的都做了,但是我要怎么循环取任务呀,好像也没说。这就剩下最关键的一个方法run了。它也是EventLoop最大的区别了。

    DefaultEventExecutor

    非常简单不分析了

    EventLoop的运行机制

    在这里插入图片描述

    1. EventExecutorGroup里面包含了n个EventExecutor,n需要在初始化的时候就指定。
    2. 在EventExecutorGroup提交的任务,它都会选择组内一个EventExecutor(顺序轮询选择)去执行,所以运行机制的重点是EventExecutor。
    3. EventExecutor内部会和一个线程进行关联,当提交第一个任务的时候,线程启动并且不停运行,除非外界执行关闭操作。
    4. EventExecutor内部有两个队列:taskQueue和scheduleTaskQueue,前者是一个阻塞队列,存储提交的普通task(用execute或者submit方法提交的);后者是一个优先级队列(线程不安全),存储的是调度任务(用schedule方法提交的),它会把最早应该执行的任务放在队首,保证peek出来的一定是队列优先级最高的。
    5. 提交普通task的时候,直接加进队列;提交定时任务的时候,当前线程是关联线程的话,加到调度队列中,否则提交一个普通task,task的内容就是:把调度任务加入调度队列中。
    6. 关联的线程一直在循环做一件事情:取任务,然后执行。
    7. 取任务的过程,图中应该比较清晰,它的具体代码实现是在类SingleThreadEventExecutor的takeTask()方法,可以结合在一起看。

    组和成员之间的关系有如下特点:

    • 组拥有成员的一部分功能
    1. 其中一些功能的具体实现需要依靠成员去完成,比如提交任务
    2. 其中一些功能与成员表示的含义层次不同,比如关闭等方法。
    • 成员拥有组的所有功能
    1. 成员比组多一些额外的功能
    2. 基于如上特点,才有了EventExecutorGroup与EventExecutor的继承关系。
    3. 当组的功能大于成员,并且放在成员上特别不合适的时候(比如类似于next和iterator这些的,成员方法实现这些方法的时候,直接返回的就是本身),继承关系是不是就应该反过来了,或许吧,还没有见过。

    和线程池一个对比

    在这里插入图片描述
    线程池就不多说大家比较熟悉,可以查看这个线程池文章《08_线程池,居然帮助帮朋友面试过了》
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    EventLoopGroup和EventLoop

    在这里插入图片描述

    NIOEventLoop

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    创建过程

    在这里插入图片描述
    图看不清看下面的连接:
    https://www.processon.com/view/link/62b698da5653bb10419639fb

  • 相关阅读:
    分治:循环比赛
    Miniconda简单操作说明
    探究 Meme 的金融与社交属性
    记录7种常见字符编码
    力扣371周赛
    多列 count(distinct)改写优化
    Kafka Rebanlace次数过高问题
    全流程R语言Meta分析核心技术
    XXL-JOB逻辑自测及执行参数配置踩坑
    前端性能优化
  • 原文地址:https://blog.csdn.net/wufagang/article/details/125392958