• 全网最透彻的Netty原理讲解 一


    文章是基于netty-4.1.95.Final版本讲解,更多是解析源码,挖掘设计思路,学习代码灵活运用。
    如果你阅读完本文,你会发现很多细节让人为之拍案叫绝~~
    本文共五万多字为第一篇Netty文章(后续还有哦),全网最详细的Netty源码解析

    1、Reactor模型

    1.1、异步

    官网地址:https://netty.io/
    Netty的描述:

    Netty is an asynchronous event-driven network application framework
    for rapid development of maintainable high performance protocol servers & clients.

    关键字:asynchronous、event-driven
    IO模式分为:同步阻塞、同步非阻塞、异步阻塞、异步非阻塞

    • 同步阻塞:双方发消息时,你给对方发完消息,一直等待回复
    • 同步非阻塞:给对方发完消息,做自己的事情,时不时看一下消息回复了么
    • 异步阻塞:发完消息后,一直等待,回复的消息会主动通知到我,进行查看
    • 异步非阻塞:发完消息后,做自己的事情,回复的消息主动通知到我,然后进行查看

    一般情况下异步操作都是异步非阻塞模式。在开发程序时,主线程按顺序处理操作,一般只有当前流程执行完,下一个才能执行,这种都是同步操作。那么如何改成异步?把每个流程改成回调即可。主线程将回调注册完成后继续处理自己的事情,而回调操作在特定事件完成后被调用,此时需要用一个其他线程来完成,那么线程池就非常重要。
    在Java中有哪些回调类?

    • Runnable:无返回值
    • Callable:有返回值
    • Future:有返回值。可以查询任务状态、取消任务、等待任务完成

    Netty使用Future类在任务完成时做回调操作,如果回调的方法不止一个怎么办?在juc的Future基础上扩展,增加监听器的功能。当任务完成时回调一组监听器即可。这就是Promise类的作用。
    简单看一下Promise类的方法:

    • setSuccess/setFailure:任务成功或失败时调用
    • addListener/removeListener:添加或删除监听器

    image.png

    Promise使用了什么设计模式?观察者模式。通过注册回调函数(Listener)来监听事件。当事件完成时,Promise会通知所有注册的Listener

    Java标准库的CompletableFuture类跟Promise类似,都有注册回调,然后在任务完成时通知的功能。

    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    public class Test1 {
        public static void main(String[] args) {
            // 为何不使用这种方式创建线程池?因为创建LinkedBlockingQueue工作队列时没有传入capacity,默认是Integer.MAX_VALUE,
            // 那么队列可以认为是没有边界的,在资源受限的情况下不可取,所以可以直接自己修改
            // ExecutorService es = Executors.newFixedThreadPool(100);
            // 核心线程数:保持活跃的线程数,即使线程是空闲的,也不会被回收,除非设置了allowCoreThreadTimeout为true
            // 最大线程数:线程池允许创建的最大线程数,当工作队列已满且活动线程数达到最大线程数时,新任务会根据拒绝策略去阻塞或拒绝
            // 线程空闲时间:超过这个空闲时间后,空闲线程会被销毁,直到线程数等于核心线程数
            // 工作队列:暂时存储未执行的任务。有多个队列可以使用,LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue等
            // 线程工厂:可以自定义线程的创建方式,例如给线程命名(实际在业务中非常重要,寻找问题的得力帮手)等
            // 拒绝策略:当线程池和工作队列都已满时,决定如何处理新提交的任务。CallerRunsPolicy(让调用线程池execute的线程去执行)、AbortPolicy(抛出异常)、DiscardPolicy(直接拒绝)、DiscardOldestPolicy(拒绝等待时间最久的任务)
            ExecutorService executorService = new ThreadPoolExecutor(5, 20,
                    60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(100),
                    new ThreadFactory() {
                        final AtomicInteger atomicInteger = new AtomicInteger();
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setName("业务线程-" + atomicInteger.incrementAndGet());
                            return thread;
                        }
                    },
                    new ThreadPoolExecutor.CallerRunsPolicy());
            // CompletableFuture异步回调类,支持链式调用
            // supplyAsync: 有返回值异步任务,可以采用定制的线程池,默认采用ForkJoinPool线程池
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + ":supplyAsync方法");
                return "hello";
            }, executorService);
            // thenAccept: 在上一个任务完成后执行,可以处理其结果,执行给定的consumer,无返回值
            // thenRun: 在上一个任务完成后执行,不处理其结果,执行给定的Runnable,无返回值
            // join: 等待上一个任务执行完成,获取最终的结果值
            completableFuture.thenAccept(str -> System.out.println(Thread.currentThread().getName() + ":" + str))
                    .thenRun(() -> System.out.println(Thread.currentThread().getName() + ":exit")).join();
            // 打印结果:
            // 业务线程-1:supplyAsync方法
            // 业务线程-1:hello
            // 业务线程-1:exit
            // main:主线程结束
            System.out.println(Thread.currentThread().getName() + ":主线程结束");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    thenAccept:注册Consumer任务
    thenRun:注册Runnable任务

    CompletableFuture支持很多函数式编程的接口类,功能更加丰富,为何Netty不使用此类作为回调类?

    • 虽然CompletableFuture功能丰富,但对netty来说实现复杂、功能冗余
    • Promise支持定制化需求。Netty需要支持更加细粒度和定制的需求,比如setSuccess/setFailure
    • Promise支持功能扩展和优化。Promise可以满足网络编程中复杂的需求。比如DefaultPromise支持任何异步操作,而DefaultChannelPromise只支持channel相关的异步操作,属于DefaultPromise子类

    1.2、事件驱动

    Netty给出一些例子,我们看个简单的:
    image.png

    1.2.1、NioEventLoopGroup

    创建两个对象:bossGroup和workerGroup。类是NioEventLoopGroup,我们就从这个类入手,深入了解。
    NioEventLoopGroup的继承树
    从继承树上可以看到NioEventLoopGroup继承了两条路线:EventExecutor和EventLoop

    EventExecutorGroup:事件执行器组。执行器是线程或线程池。

    • 继承ScheduledExecutorService,拥有延迟执行或周期性执行任务的方法。
    • Iterable,group是有一个或多个EventExecutor,继承Iterable拥有for-each功能

    image.png
    EventExecutor:继承EventExecutorGroup。一个人也可以是一个组,一个事件执行器也可以是一个组
    image.png
    AbstractEventExecutorGroup:继承EventExecutorGroup。使用模板方法设计模式,定义算法的骨架,将某些步骤延迟到子类实现。
    image.pngimage.png
    MultithreadEventExecutorGroup:多线程事件执行器组。使用多个工作线程(EventExecutor)来执行任务和处理事件
    image.png
    image.png
    既然有多线程的,有没有单线程的?
    SingleThreadEventExecutor:继承AbstractScheduledEventExecutor,支持延迟任务和周期性任务。实现OrderedEventExecutor,表示该执行器是有顺序的,确保相同的任务或事件按照提交的顺序执行。
    image.png
    小结:事件执行器是一个处理任务或事件的组件,支持多线程和单线程模式,分别实现EventExecutorGroup和EventExecutor

    EventLoopGroup:事件循环组。继承EventExecutorGroup,表示要使用多线程模式。提供注册Channel和ChannelPromise的功能。next方法重写EventExecutorGroup的方法,用于获取下一个可用的EventLoop对象。
    image.png
    image.png
    EventLoop:事件循环。

    • 继承EventLoopGroup,一个人也是一个组。继承自OrderedEventExecutor,表示它是有序执行任务的事件执行器。
    • parent方法返回EventLoopGroup对象,表示当前EventLoop的父级事件是EventLoopGroup。这个方法上级是EventExecutor中的,其返回值是EventExecutorGroup。

    image.png
    小结:事件循环器是将事件注册,按一定顺序获取事件,交由事件执行器循环处理的。EventLoop使用单线程模式,EventLoopGroup使用多线程模式。

    类之间的关系如下,EventLoop使用EventExecutor执行事件,EventLoopGroup由EventExecutorGroup执行一组事件。
    执行器组与执行器的关系

    NioEventLoop继承树的关键类
    关键继承的类:SingleThreadEventLoop、SingleThreadEventExecutor、EventLoop
    SingleThreadEventLoop:继承SingleThreadEventExecutor,拥有单线程执行任务的功能
    image.png
    NioEventLoop:从命名来看是Nio + EventLoop。EventLoop是管理(注册)事件(Channel)的。Nio是Java标准库中的New IO,使用Selector + ByteBuffer + Channel的架构实现高性能网络编程,由Selector管理Channel。那么NioEventLoop也要集成Selector,管理自己的channel对象。
    image.png

    • Selector是NIO中的选择器,管理注册的事件,底层原理是epoll。selectedKeys是epoll返回可用事件的集合。

    image.png
    NioEventLoopGroup:继承MultithreadEventLoopGroup,拥有多线程执行的功能。

    小结:NioEventLoopGroup管理一组NioEventLoop,交由事件执行器组(EventExecutorGroup)处理。NioEventLoop由selector管理事件,交由事件执行器(EventExecutor)处理。
    EventLoop处理逻辑

    1、NioEventLoopGroup创建过程

    bossGroup的线程数为1,workerGroup使用的是空的构造方法,线程数为0
    image.png
    image.png

    SelectorProvider顾名思义是selector提供者,如果你是Windows系统provider变成WEPollSelectorProvider。要从Hotspot源码中寻找到Linux包,创建的是EPollSelectorProvider对象。最后会创建EPollSelectorImpl(Selector子类)对象返回。

    注:epoll是非常重要的功能,在后面会详细解释,现在只要记住Linux肯定会使用epoll参与网络事件
    SelectorProvider使用了单例模式,Holder.INSTANCE静态final变量确保只有一个SelectorProvider对象存在

    image.png
    image.png

    DefaultSelectStrategyFactory是默认的SelectStrategyFactory的子类,从类名来看用于select决策的工厂类,会决定是否调用selector.select的类。
    RejectedExecutionHandlers.reject():拒绝执行的处理器。默认是抛出RejectedExecutionException异常。
    image.png
    image.png

    如果nThreads为0,使用默认的线程数为 CPU的核心数 * 2。为啥要使用这个值?
    程序的性能瓶颈是网络、磁盘、CPU等等,可以归类为两种情况:IO密集型、CPU密集型

    • IO密集型:频繁进行IO操作,比如网络传输、磁盘读写等。
    • CPU密集型:频繁进行计算操作,比如数据处理、图像处理、加密解密等。

    两种性能瓶颈的解决方法:

    • IO密集型:合理使用线程池,使用合适的线程数。比如nThread默认为这个数,因为CPU的一个核心同一时间只能运行一个线程,考虑可能有线程会睡眠等待事件完成,另一个线程正好在此期间运行,CPU也不会空闲,所以CPU的核心数*2的线程数比较适合IO密集型操作。若线程数太多会导致CPU频繁切换,延迟增加。如果任务太多就在阻塞队列中排队,队列也满了就采用拒绝策略。
    • CPU密集型:线程数是CPU的核心数即可。每个核心运行一个线程,全力执行计算操作,线程数多了就会将时间浪费在线程切换上。

    那么Netty用于网络编程中,明显属于IO密集型。
    调用super来到父类:MultithreadEventLoopGroup
    image.png

    DefaultEventExecutorChooserFactory是默认的选择器工厂类
    image.png
    为何要判断EventExecutor的数量是否是二次幂?

    • 如果数量为8,二进制是 1000,减一是0111,与操作可以得到0~7的数。
    • 若不是二次幂,假设数量为7,二进制是0111,减一是0110,与操作只能得到0、2、4、6的数,无法得到1、3、5的数
    • 若不是二次幂,就使用通用的取模方式(%)
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 执行器数量是否是二次幂
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    // PowerOfTwoEventExecutorChooser
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
    
    // GenericEventExecutorChooser
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    继续调用super来到父类:MultithreadEventExecutorGroup。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        // nThreads不能小于0
        checkPositive(nThreads, "nThreads");
    
        // 从目前链路来看,executor为空
        if (executor == null) {
            // ThreadPerTaskExecutor是给每个任务创建本地线程,不使用线程池
            // DefaultThreadFactory是默认的线程工厂,newThread方法创建Thread对象(FastThreadLocalThread对象)
            // 此方法没有限制线程数,可能是因为线程用完即弃,每次任务都需要创建新线程执行。
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        // children是group中的执行器数组
        // 每个执行器只使用一个线程,保证TLS机制,线程安全且高效
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创建执行器对象。newChild是抽象方法,必须由子类实现。此处使用了模板设计模式
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // success为false,说明创建对象失败
                if (!success) {
                    // 将已经创建的执行器依次关闭
                    for (int j = 0; j < i; j ++) {
                        // 优雅地关闭执行器。就是先将任务执行完,再关闭
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            // 等待执行器关闭
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            // 如果被中断,就设置中断标志位,退出循环
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    
        // 根据执行器数量决定使用哪种选择器
        chooser = chooserFactory.newChooser(children);
    
        // 监听执行器停止的监听器对象
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                // 当所有执行器结束后,将回调对象terminationFuture设置为success,然后会唤醒所有挂在此Future后的监听器
                if (terminatedChildren.incrementAndGet() == children.length) {
                    // terminationFuture属于当前执行器组的
                    terminationFuture.setSuccess(null);
                }
            }
        };
    
        // 给每个执行器添加监听器
        for (EventExecutor e: children) {
            // e.terminationFuture()是属于各自执行器的terminationFuture
            e.terminationFuture().addListener(terminationListener);
        }
    
        // 将执行器数组改成只读不可修改的Set集合。
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    这个构造方法初始化一些操作:创建执行器、获取chooser、添加监听器等
    小结:EventExecutorGroup管理一组EventExecutor,每个EventExecutor独享一个线程,通过chooser选择EventExecutor执行任务。

    2、EventExecutor创建过程

    在上面可以看到newChild是创建EventExecutor的核心方法,在MultithreadEventExecutorGroup中是抽象方法,就要由子类实现。此处使用了模板设计模式。
    image.png
    子类是NioEventLoopGroup,看下newChild实现
    image.png
    创建NioEventLoop:初始化参数

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        // 调用父类构造器,传入必要参数
        // newTaskQueue:生成 无界的多生产者-单消费者队列(MPSC)
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // 调用provider.openSelector()获取selector对象,若是Linux系统,就是EPollSelectorImpl对象。
        final SelectorTuple selectorTuple = openSelector();
        // 两个对象是一样的
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    // parent:NioEventLoopGroup对象
    // executor:ThreadPerTaskExecutor对象,每个任务起一个线程
    // addTaskWakesUp:此变量的意思是添加任务时唤醒EventLoop,但根据上下文意思相反。true不唤醒,false唤醒
    // taskQueue:任务队列,存放等待执行的任务
    // tailTaskQueue:尾部任务队列,在事件处理之后、准备下一次轮询之前的任务。通常是周期性或延迟的任务
    // rejectedExecutionHandler:如果任务添加到任务队列失败,就会采取拒绝操作。默认是RejectedExecutionHandlers.reject()拒绝策略
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                        RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        // 默认等待的任务数为16
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        // 当前EventExecutor设置到当前Thread的本地变量中(TLS),此处在后面可以详细说一下
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    // EventExecutor的父类是EventExecutorGroup。从上下文来看parent是NioEventLoopGroup对象。
    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }
    
    • 1
    • 2
    • 3
    • 4

    小结:NioEventLoop创建过程中,主要做了变量初始化
    添加chooser和Thread

    3、EventExecutor和Thread绑定过程

    很多人会对这段代码有点疑问,不知道啥意思。实际这里是使用TLS机制将执行器与线程进行绑定。
    image.png
    Netty使用FastThreadLocal来代替Java原生的ThreadLocal,为啥要这样做呢?
    在Java线程中,都有一个ThreadLocalMap实例变量(如果不使用ThreadLocal,不会创建map变量)。ThreadLocal只有一个变量,设置TLS数据时,给每个线程创建ThreadLocalMap变量。该map使用线性探测的方式解决hash冲突的问题,如果没有空闲的位置,就不断往后尝试,找到空闲的位置,插入entry。那么在发生hash冲突时就会影响效率。
    FastThreadLocal解决了hash冲突的问题,下面根据源码看看如何解决的

    返回新的Executor,将eventExecutor映射到当前线程中。

    // executor:ThreadPerTaskExecutor对象,是Java基础库的Executor接口的实现类
    // eventExecutor:SingleThreadEventExecutor对象
    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        // 创建新的执行器
        return new Executor() {
            @Override
            public void execute(final Runnable command) {
                // execute会创建新线程来执行任务
                // apply:eventExecutor绑定到当前线程的本地变量中。此方法要返回一个Runnable对象。
                executor.execute(apply(command, eventExecutor));
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        // 返回新的Runnable对象
        return new Runnable() {
            @Override
            public void run() {
                // 设置eventExecutor到当前线程的本地变量中
                setCurrentEventExecutor(eventExecutor);
                try {
                    // 运行command任务
                    command.run();
                } finally {
                    // 将线程的本地变量情况,help GC
                    setCurrentEventExecutor(null);
                }
            }
        };
    }
    
    private static void setCurrentEventExecutor(EventExecutor executor) {
    	// mappings:FastThreadLocal对象,直接使用static final修饰。此变量会被多线程共享。
        mappings.set(executor);
    }
    
    // 泛型是EventExecutor,线程本地变量存放的是EventExecutor对象
    private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    FastThreadLocal是ThreadLocal的变种,具有更高的性能。作用也类似,ThreadLocal也是全局变量,只不过Thread有ThreadLocalMap的变量副本,ThreadLocal就是管理这些变量增删改查的作用。那么FastThreadLocal也是全局变量,管理线程本地变量。所以mappings变量是全局变量。

    public final void set(V value) {
    	// UNSET是默认值,如果不是这个值就添加,否则就删除
        if (value != InternalThreadLocalMap.UNSET) {
        	// 若当前线程是FastThreadLocalThread对象,就返回自定义高性能的map,反之就返回ThreadLocal中的变量
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 将value设置到threadLocalMap中。
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    InternalThreadLocalMap是代替ThreadLocalMap的。若线程属于FastThreadLocalThread就返回快的map,否则就从ThreadLocal中获取慢的map。

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            // 获取快的map
            return fastGet((FastThreadLocalThread) thread);
        } else {
            // 获取慢的map
            return slowGet();
        }
    }
    
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    	// 从FastThreadLocalThread中获取threadLocalMap变量,跟Thread一样,有threadLocalMap变量
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    	// 若为空,就创建InternalThreadLocalMap对象传入
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
    
    // 将InternalThreadLocalMap放到ThreadLocalMap中
    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
                new ThreadLocal<InternalThreadLocalMap>();
    
    private static InternalThreadLocalMap slowGet() {
        // 如果ThreadLocalMap没有InternalThreadLocalMap对象,就设置新的对象
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    setKnownNotUnset方法:将值设置到FastThreadLocalMap的indexedVariables数组中

    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        // 在indexedVariables数组的index位置,设置value
        if (threadLocalMap.setIndexedVariable(index, value)) {
            // 将当前FastThreadLocal添加到remove列表
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
    
    private final int index;
    
    public FastThreadLocal() {
        // index是InternalThreadLocalMap中的nextIndex的数值,每调用一次nextVariableIndex,数值就加1
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // VARIABLES_TO_REMOVE_INDEX:数组下一个空闲位置,由于是常量,此位置保持不变
        // 从indexedVariables数组中获取index位置的元素
        Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
        // remove列表,set集合用于去重
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            // 若元素为空,设置新的对象。
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            threadLocalMap.setIndexedVariable(VARIABLES_TO_REMOVE_INDEX, variablesToRemove);
        } else {
            // 若元素不为空,强转为Set>类
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
    	// 往列表中添加FastThreadLocal对象
        variablesToRemove.add(variable);
    }
    
    // 获取数组下一个空闲位置
    public static final int VARIABLES_TO_REMOVE_INDEX = nextVariableIndex();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    小结:

    • 每一个FastThreadLocal实例创建时,分配一个下标index;分配index使用AtomicInteger实现,每个FastThreadLocal都能获取到一个不重复的下标。当调用ftl.get()方法获取值时,直接从数组获取返回,如return array[index]
    • FastThreadLocal是ThreadLocal的变种,FastThreadLocalThread又是Thread的子类,采用了相同的处理逻辑。当通过ThreadExecutorMap添加eventExecutor时每个线程只会占用一个空闲位置,将eventExecutor放到FastThreadLocalThread的threadLocalMap局部变量(InternalThreadLocalMap类)中。

    Netty高性能ThreadLocal

    1.2.2、AbstractBootstrap

    我们还没了解bossGroup和workerGroup是干啥的,就通过ServerBootstrap深入了解。

    AbstractBootstrap:这是个泛型类定义。泛型B继承AbstractBootstrap,B是AbstractBootstrap的子类。泛型C是Channel的子类
    image.png
    作为Netty的引导基类,提供了一些通用的方法和属性,可以用于配置和启动网络程序。

    • localAddress:配置IP和port的方法
    • register:将channel注册到selector中的方法
    • bind:绑定IP和端口号

    image.png

    • group:EventLoopGroup对象,对ServerBootstrap来说此变量是bossGroup
    • localAddress:SocketAddress对象,存放IP和port的
    • handler:处理器,在事件处理过程中执行的操作

    image.png
    AbstractBootstrap支持链式调用,怎么实现的呢?很简单,方法返回当前对象即可。
    image.png
    image.png
    为何AbstractBootstrap定义了bind方法,这不是ServerBootstrap才有的吗?实际上Bootstrap也即客户端也有bind操作。如果调用bind方法,客户端会指定端口号,否则由内核指定端口号。客户端也不常用。

    ServerBootstrap是对服务端的配置和启动。服务端有bind、listen和accept操作,分别是绑定IP和端口号、监听端口号、接收客户端连接。bind已经在AbstractBootstrap实现,listen实际已经在nio的bind方法中调用,那么只剩下accept操作。
    创建内部类:ServerBootstrapAcceptor。用于处理accept操作。当然此类实现ChannelInboundHandlerAdapter,表示accept是入站操作,调用channelRead给新连接做一些统一处理
    image.png

    • 给新连接配置一些属性、选项、处理器。都是child开头的变量,而在例子中有childHandler方法,说明child是客户端的channel
    • 将新连接注册到childGroup进行处理。
    • 将新连接连接注册到selector,此时并不会注册感兴趣的事件,而是在之后通过interestOps将事件注册。
    • 添加的监听器是Channel的第一个监听器,作用在于如果新连接注册失败,就关闭客户端连接。

    image.png
    group方法:

    • bossGroup是parentGroup,workerGroup是childGroup。bossGroup是负责服务端事件(accept)。workerGroup是负责客户端事件。

    image.png
    image.png
    channel方法:

    • 包装一个channelFactory,使用工厂模式,ChannelFactory是个接口类,定义newChannel的方法,在创建channel对象时调用此方法即可。ReflectiveChannelFactory就是使用反射创建对象。

    image.png
    handler和childHandler方法:

    • handler是服务端的处理器,childHandler是连接的客户端的处理器。在初始化时会被加到pipeline中。

    小结:ServerBootstrap是对服务端以及新连接的客户端进行配置。Bootstrap就是对客户端配置以及启动。每个channel都有一个pipeline,pipeline有多个处理器,在事件完成时被按顺序调用。
    添加流水线

    1.2.3、事件处理过程

    我们说的事件有哪些呢?

    • 服务端:bind、accept
    • 客户端:bind、connect、read、write

    我们就以server端的bind切入,关于Java标准库中的知识点在Channel篇中详解。

    bind方法支持多种传参方式,最终都是IP + port的方式,然后封装成InetScoketAddress对象。
    image.png

    doBind方法

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化channel,并将channel注册到selector,返回Future对象。Future包含channel以及事件完成与否。
        final ChannelFuture regFuture = initAndRegister();
        // 从Future获取channel对象
        final Channel channel = regFuture.channel();
        // 是否有异常,如果有就立即返回。
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        // 注册操作是否完成,成功或失败都算完成
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            // 这里我们知道注册已经完成,创建新的Promise,用于下一步操作的回调
            ChannelPromise promise = channel.newPromise();
            // 注册成功就注册到selector,否则就获取异常对象。
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else { // 表示注册未完成
            // Registration future is almost always fulfilled already, but just in case it's not.
            // 等待注册的promise
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 为regFuture注册监听器。当注册完成时会调用监听器
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    // 是否注册失败
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        // 注册成功,执行bind操作
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            // 返回bind操作的promise
            return promise;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    initAndRegister方法

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 通过channel工厂创建对象,在之前看过这个工厂类是ReflectiveChannelFactory(通过反射创建对象)
            channel = channelFactory.newChannel();
            // 此方法在AbstractBootstrap中是抽象方法,由子类实现。此处使用了模板方法模式。
            // 对channel做初始化操作,比如在pipeline中添加设置的处理器
            init(channel);
        } catch (Throwable t) {
            // 出现异常,要将channel关闭
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                // promise设置为failure
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            // 如果channel创建失败,就给一个FailedChannel对象
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        // config():由于我们是从ServerBootstrap进来的,所以是ServerBootstrapConfig对象
        // group():bossGroup
        // 将channel注册到selector,也会将EventLoop注册到channel中。
        ChannelFuture regFuture = config().group().register(channel);
        // 如果注册失败,将channel关闭
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.
    
        return regFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    doBind0方法:doBind是逻辑入口点,而doBind0是具体实现

    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        // 将绑定任务添加到channel绑定的EventLoop的任务队列(taskQueue)中,不会直接执行
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 注册操作是否成功
                if (regFuture.isSuccess()) {
                    // 若成功,就执行绑定操作,在返回的Future中添加监听器。这个监听器判断是否成功,失败就关闭channel
                    // 既然将channel绑定到指定的EventLoop中,在执行bind操作时也要考虑当前线程是不是绑定的EventLoop的线程
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    // 失败的话就传递异常对象
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    register方法:config().group().register(channel)

    • config():由于我们是从ServerBootstrap进来的,所以是ServerBootstrapConfig对象
    • group():bossGroup
    • register:由于是NioEventLoopGroup,定位到MultithreadEventLoopGroup类,next()调用register,此方法在MultithreadEventExecutorGroup中实现:chooser.next()
    • chooser:默认是DefaultEventExecutorChooserFactory对象,采用轮询方式选择EventExecutor

    现在需要知道EventExecutor是哪个类?在NioEventLoopGroup创建时,newChild是创建EventLoop的核心方法
    NioEventLoopGroup ==> MultithreadEventExecutorGroup
    NioEventLoop ==> SingleThreadEventExecutor
    结论:EventExecutor是SingleThreadEventExecutor类
    register是EventLoopGroup定义的方法,SingleThreadEventLoop同时继承EventLoopGroup和SingleThreadEventExecutor,方法实现就在这个类中
    为啥Channel要有Unsafe类?在Java标准库中也有Unsafe类,涉及CAS、屏障之类的native方法,要与JVM交互,设计人员认为开发者不懂JVM和内核底层,无法正确使用这些功能,遂将此类命名为Unsafe,表示此类是不安全的,不要盲目使用。那么Netty设计思路也是一样,Channel最终要与Java标准库的Channel交互,他们认为Java库是不安全的,不要盲目使用。所以Netty的Unsafe类是与Java的Nio进行交互的。

    public ChannelFuture register(Channel channel) {
    	// DefaultChannelPromise顶级父类接口是ChannelFuture,相比于Future增加了channel方法,返回绑定的channel
    	// 专属于Channel异步结果操作。promise是register结果
        return register(new DefaultChannelPromise(channel, this));
    }
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 根据上下文,channel是NioServerSocketChannel
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    由于register、bind、read、write是客户端和服务端通用的事件,所以他们会在父类实现。那么register方法在AbstractUnsafe类中实现的。AbstractUnsafe是AbstractChannel的内部类。
    在注册事件的前后做一些额外的操作

    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(eventLoop, "eventLoop");
        // 是否已经注册。若已注册,设置异常信息返回
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        // eventLoop是否属于NioEventLoop类。若不是,设置异常信息返回
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
    
        // this.eventLoop是AbstractChannel中volatile修饰的属性,保证属性在读写时都是最新值。
        // 由于此时是store操作,JVM会加上store load屏障,既保证写缓存立即更新到内存,又保证读缓存从内存读取新值
        // 注意eventLoop是此时被赋值,后续会通过channel.eventLoop()获取到此对象
        AbstractChannel.this.eventLoop = eventLoop;
    
        // eventLoop父类是SingleThreadEventExecutor,有volatile修饰的thread属性,存放第一次运行时的线程.
        // inEventLoop是判断当前线程与EventExecutor映射的线程是否同一个。若是,就直接调用register0执行注册功能,
        // 否则放到eventLoop的任务队列中,等待被调度执行。
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 将任务添加到eventLoop专属的任务队列
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) { // 出现异常,关闭Channel
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                // 关闭channel
                closeForcibly();
                // closeFuture设置为success,处理已注册的监听器
                closeFuture.setClosed();
                // 将register的promise设置为failure
                safeSetFailure(promise, t);
            }
        }
    }
    
    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
        	// 采用模板设计模式,doRegister是抽象方法,由子类实现
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            // 在通知promise之前,触发执行ChannelHandler的handlerAdded方法
        	pipeline.invokeHandlerAddedIfNeeded();
    
            // 将ChannelPromise标记为成功,顺便通知已经注册的监听器
            safeSetSuccess(promise);
            // 触发通道注册完成事件,执行ChannelInboundInvoker.fireChannelRegistered方法
            pipeline.fireChannelRegistered();
        
        	// Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
        
            // channel是否活跃。
        	// NioServerSocketChannel是确保channel开启,且已经绑定端口
            // NioSocketChannel是确保channel开启,且已经与服务器建立连接
            // 若从bind之前来看,此处是false。bind之后就是true了
        	if (isActive()) {
                // 是否是第一次注册
                if (firstRegistration) {
                    // 若通道是第一次注册,触发通道激活事件,执行ChannelInboundInvoker.fireChannelActive方法
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    // 若通道不是第一次注册,并且之前已经设置自动读取的选项(config.setAutoRead(true),默认autoRead=1),
                    // 则开始读取操作且执行入站操作。
                    // server的read事件是OP_ACCEPT,client的read事件是OP_READ
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    doRegister方法由子类实现,既然都是Nio的channel,而且server和client都有注册功能,所以此方法在他们共同的父类AbstractNioChannel中被实现

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将当前保存的Java的channel对象注册到selector中,但不注册感兴趣的事件。为何这样设计?注册channel和事件都要额外的处理
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {// 异常说明因特定情况取消该channel的监听
                if (!selected) { // 只执行一次selectNow
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow(); // 执行selectNow,不等待执行从epoll返回可用事件
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    register过程已经详解,现在了解bind过程
    由于bind是server和client共有的功能,所以此方法实现在父类
    为何事件要在pipeline中操作?pipeline可以注册handler,handler有netty原生的,也可以由开发者自定义。在操作事件的过程中,可以触发handler。那么pipeline提供高可扩展性,并且能够灵活定制事件处理流程。

    // localAddress:bind肯定是绑定本地IP和端口。
    // promise:当注册channel完成时,就是DefaultChannelPromise。若未注册完成,就是PendingRegistrationPromise
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // 在pipeline中处理事件
        return pipeline.bind(localAddress, promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    pipeline在AbstractChannel的构造器中创建,默认是DefaultChannelPipeline类
    在pipeline中有两个特殊的上下文(context):HeadContext和TailContext。处于管道的头部和尾部,分别处理入站和出站的操作。他们父类都是AbstractChannelHandlerContext
    出站入站怎么定义的?消息从应用程序发送到网络的叫出站事件,反之消息从网络发送到应用程序的叫做入站事件
    server:

    • 入站:accept、read
    • 出站:bind、write

    client:

    • 入站:read
    • 出站:bind、connect、write
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // bind是出站事件,自然由tail调用
        return tail.bind(localAddress, promise);
    }
    
    • 1
    • 2
    • 3
    • 4

    AbstractChannelHandlerContext实现这个bind方法

    • 从tail往前遍历到head,找到没有被@Skip注解修饰bind方法的handlerContext类
    • 当前handlerContext是否执行ChannelHandler的handlerAdded方法
    • 执行自定义的ChannelOutboundHandler的bind方法
    • 总体构成递归操作,直到遍历到headContext,执行bind方法

    headContext的bind是实际的操作,之前的handler是对bind的补充。

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(localAddress, "localAddress");
    	// promise是否被取消
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
    
        // 1、出站事件,由tail往head遍历
        // 2、找到没有被@Skip注解修饰的handlerContext的bind方法
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
        // 获取处理器中的事件执行器
        EventExecutor executor = next.executor();
        // 当前线程是否与事件执行器中的线程一致
        if (executor.inEventLoop()) {
            // 若一样,直接执行invokeBind方法
            next.invokeBind(localAddress, promise);
        } else {
            // 否则放入执行器的任务队列中,等待被调度执行
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }
    
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        // 是否执行了ChannelHandler的handlerAdded(ChannelHandlerContext)方法
        if (invokeHandler()) {
            try {
                // DON'T CHANGE
                // Duplex handlers implements both out/in interfaces causing a scalability issue
                // see https://bugs.openjdk.org/browse/JDK-8180450
                // 获取当前Context中的handler
                final ChannelHandler handler = handler();
                // 用局部变量接收全局变量,防止head在操作时发生改变,相当于是一个变量副本
                final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
                // 是否等于headContext
                if (handler == headContext) {
                    // 执行headContext的bind方法。若从tail找到head,就会执行head的bind方法。实际的绑定操作在headContext中完成,之前都是对bind的补充。
                    headContext.bind(this, localAddress, promise);
                } else if (handler instanceof ChannelDuplexHandler) {
                    // ChannelDuplexHandler既实现出站又实现入站
                    ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
                } else {
                    // 普通的出站handler
                    ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
                }
            } catch (Throwable t) {
                // promise设置异常信息
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            // 未执行handlerAdded方法
            // 这又会回到之前的bind方法中,为啥要这样做呢?总体是递归的过程,目的是执行完符合条件的handler
            bind(localAddress, promise);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    目前没有自定义的handler实现bind方法,所以直接查看headContext的源码

    public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        // bind方法是客户端和服务端共有的事件,所以在父类实现
        unsafe.bind(localAddress, promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    unsafe的父类是AbstractUnsafe抽象类,是AbstractChannel的内部类。由于子类的实现细节不同,使用模板设计模式先定义好执行步骤,实际的绑定操作doBind方法由子类实现。

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        // 确保当前线程属于当前的eventLoop
        assertEventLoop();
    
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
            // Warn a user about the fact that a non-root user can't receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can't receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }
    
        // isActive由子类实现,怎样才算激活?
        // NioServerSocketChannel:channel开启,并且已经绑定端口
        // NioSocketChannel:channel开启,并且已经与服务器建立连接
        boolean wasActive = isActive();
        try {
            // 子类实现绑定细节
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        // 第一次激活要触发响应的操作,调用ChannelInboundHandler.channelActive方法
        if (!wasActive && isActive()) {
            // 放入执行器的任务队列,等待被调度执行
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    // 此方法必须在第一次激活时调用
                    pipeline.fireChannelActive();
                }
            });
        }
    
        // 由于没有放入eventLoop的任务队列,bind操作又是阻塞操作,所以此处直接设置promise为success,并触发监听器
        safeSetSuccess(promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    doBind方法就是给socket绑定IP和端口号,详细的内容在后续解析。这里着重看下fireChannelActive实现过程

    public final ChannelPipeline fireChannelActive() {
    	// 调用handlerContext的invokeChannelActive静态方法,传入headContext,很明显是一个入站方法
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    // 为啥变量名是next?因为此方法与其他一些方法组成递归操作
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        // 是否在事件循环中?
        if (executor.inEventLoop()) {
            // 若是,就直接执行方法
            next.invokeChannelActive();
        } else {
            // 否则放入任务队列等待执行
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    
    private void invokeChannelActive() {
        // 已经执行handlerAdded方法
        if (invokeHandler()) {
            try {
                // DON'T CHANGE
                // Duplex handlers implements both out/in interfaces causing a scalability issue
                // see https://bugs.openjdk.org/browse/JDK-8180450
                final ChannelHandler handler = handler();
                final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
                if (handler == headContext) {
                    // 若handler为headContext,执行此方法
                    headContext.channelActive(this);
                } else if (handler instanceof ChannelDuplexHandler) {
                    // 双工通信的处理器
                    ((ChannelDuplexHandler) handler).channelActive(this);
                } else {
                    // 入站处理器
                    ((ChannelInboundHandler) handler).channelActive(this);
                }
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            // 找到合适的处理器来执行
            fireChannelActive();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    还是跟之前的一样,此方法也由headContext实现
    ctx.fireChannelActive会将事件传递给下一个ChannelInboundHandler进行处理,若下一个处理器没有继续往下传递,那么此方法只会传递一次。否则会一直传递下去

    public void channelActive(ChannelHandlerContext ctx) {
        // 由于一开始传入的是head,在DefaultChannelPipeline的构造器中,所以从head的下一个找到合适的处理器执行。
        ctx.fireChannelActive();
    	// 若已经配置autoRead为true,则执行读取操作
        readIfIsAutoRead();
    }
    
    private void readIfIsAutoRead() {
        // 默认配置为true
        if (channel.config().isAutoRead()) {
            // 服务端的read是accept事件,客户端的read是read事件
            channel.read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    根据上下文,我们还在server的bind流程中,那么read方法要在server的channel中查看源码。又由于server和client都有read操作,所以此方法的算法逻辑还是在父类AbstractChannel中实现。

    public Channel read() {
        // read操作在pipeline中完成
        pipeline.read();
        return this;
    }
    
    public final ChannelPipeline read() {
        // read是出站操作,要从后往前执行
        tail.read();
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    public ChannelHandlerContext read() {
        // 遍历寻找read方法未被@Skip修饰的处理器
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 执行invokeRead方法
            next.invokeRead();
        } else {
            // 使用这种封装Runnable的方式,避免重复添加任务
            Tasks tasks = next.invokeTasks;
            if (tasks == null) {
                next.invokeTasks = tasks = new Tasks(next);
            }
            // 将next.invokeRead()方法放到Runnable中,添加到对应的eventLoop的等待队列
            executor.execute(tasks.invokeReadTask);
        }
    
        return this;
    }
    
    private void invokeRead() {
        // 是否具备执行handler的条件
        if (invokeHandler()) {
            try {
                // DON'T CHANGE
                // Duplex handlers implements both out/in interfaces causing a scalability issue
                // see https://bugs.openjdk.org/browse/JDK-8180450
                final ChannelHandler handler = handler();
                final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
                if (handler == headContext) {
                    // 执行headContext的read方法,这是实际读取事件的实现。
                    headContext.read(this);
                } else if (handler instanceof ChannelDuplexHandler) {
                    // 双工处理器
                    ((ChannelDuplexHandler) handler).read(this);
                } else {
                    // 出站处理器
                    ((ChannelOutboundHandler) handler).read(this);
                }
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            // 未符合要求,寻找下一个handler
            read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    headContext的read方法实现读取事件的操作

    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }
    
    • 1
    • 2
    • 3

    Unsafe对象对于服务端和客户端是不同的,对象的创建是在newUnsafe方法中,此方法是子类实现。方法的实现方是下面两个类中,分别对应客户端和服务端。为何会这样定义?服务端就是接收客户端连接,将fd 封装成channel对象。客户端就是与服务端进行读写操作,都是字节流操作。Netty认为服务端是channel交互,是个对象,也即message。而客户端是字节流操作,就是byte。
    image.png
    对于服务端来说,newUnsafe的实现在AbstractNioMessageChannel中,下图中unsafe是NioMessageUnsafe对象。
    image.png
    但是方法逻辑还是在AbstractUnsafe中

    public final void beginRead() {
        // 确保当前线程属于当前的事件循环的执行线程
        assertEventLoop();
    
        try {
            // 抽象方法,由子类实现
            doBeginRead();
        } catch (final Exception e) {
            // 出现异常,添加任务:触发异常捕捉操作,执行ChannelHandler的exceptionCaught方法
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    服务端的unsafe类是AbstractNioMessageChannel类

    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }
    	// 调用父类的doBeginRead方法
        super.doBeginRead();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注册accept事件,epoll会对server的channel进行监听。若有事件准备完毕,epoll会将已经准备好的事件返回。

     protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
    	// SelectionKey是channel感兴趣的事件集合
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
    	// 表示read进行中
        readPending = true;
    
    	// 获取感兴趣的事件集
        final int interestOps = selectionKey.interestOps();
        // 是否已经注册读取事件
        if ((interestOps & readInterestOp) == 0) {
            // 若没有注册,就往事件集注册read事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    服务端没有read操作,那这个readInterestOp是啥?从AbstractNioChannel溯源。此类的构造器中对readInterestOp赋值,由构造器传入。
    image.png
    子类AbstractNioMessageChannel也是传入的参数,继续往子类查找
    image.png
    子类NioServerSocketChannel传入的是ACCEPT事件,事实上服务端的读取事件就是ACCEPT事件。
    image.png

    小结:

    • bind事件是要先将channel注册到selector中,但不必注册感兴趣的事件,因为此时也不清楚要注册哪个事件。
    • 注册channel前后触发pipeline机制
    • 注册channel完成直接执行bind操作,否则放入对应事件执行器的等待队列,等待被调度执行。最后肯定是要执行bind操作的
    • bind操作前后也会触发pipeline机制
    • 当注册完成时,若channel已经设置自动读取标识(默认),channel会注册感兴趣的事件(server => accept事件,client => read事件)。

    Netty (2).png

    1.2.4、事件循环过程

    在bind操作时,一直都是主线程执行代码,直到在AbstractUnsafe.register方法中,执行注册操作时判断是否在事件循环中。inEventLoop是判断eventLoop的工作线程与当前线程是否一致,若一致直接执行register0操作,反之调用eventLoop的execute方法传入Runnable对象。
    image.png
    image.png
    execute方法在SingleThreadEventExecutor实现,因为这个方法是JUC的Executor的接口方法,而只有STEE实现了方法。
    image.png
    wakesUpForTask:STEE中为恒定值true。

    private void execute0(@Schedule Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, wakesUpForTask(task));
    }
    
    • 1
    • 2
    • 3
    • 4

    image.png
    immediate:根据上下文为true

    private void execute(Runnable task, boolean immediate) {
        // 由于当前是主线程,所以inEventLoop为false
        boolean inEventLoop = inEventLoop();
        // 往taskQueue添加任务
        addTask(task);
        // 此时为false
        if (!inEventLoop) {
            // 启动线程,核心方法
            startThread();
            // 当前事件执行器的状态为ST_SHUTDOWN、ST_TERMINATED时,说明执行器已经关闭
            if (isShutdown()) {
                boolean reject = false;
                try {
                    // 从taskQueue中删除任务
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                // 任务删除成功,可以执行拒绝策略
                if (reject) {
                    reject();
                }
            }
        }
    
        // 根据之前的源码得知addTaskWakesUp默认为false,所以if判断为true
        if (!addTaskWakesUp && immediate) {
            // eventLoop是NioEventLoop的对象,wakeup在此类重写。唤醒epoll返回准备好的事件
            wakeup(inEventLoop);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    image.png

    STEE事件执行器使用state表示执行状态,有五种状态:未启动、已启动、关闭中、已关闭、已终止。state变量使用volatile修饰是为了保证变量在读写时都能获取到最新值。从JMM角度来说,volatile写操作使用store load屏障,也即全屏障。CPU先清空invalidate queue,让缓存行对应的变量值失效,然后更新变量值,放入store buffer,立即将buffer数据刷新到内存,同时将最新值发送给其他CPU的invalidate queue中。
    image.png
    image.png
    startThread是核心方法,使用executor启动了事件循环的任务

    private void startThread() {
        // state是否为未启动状态
        if (state == ST_NOT_STARTED) {
            // 通过CAS将旧值ST_NOT_STARTED,改成新值ST_STARTED。若旧值不是未启动状态,更新失败返回false
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    // 启动线程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        // 启动失败,将状态改成ST_NOT_STARTED
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    executor是juc的Executor对象,让当前的EventExecutor与当前线程映射
    image.png
    image.png

    private void doStartThread() {
        // 确保thread变量为空
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 将当前执行线程对象赋给thread变量。inEventLoop方法就是判断thread变量与当前执行线程是否相同
                thread = Thread.currentThread();
                // 线程是否被中断
                if (interrupted) {
                    // 若被中断,就在当前执行线程中添加中断标志位。
                    thread.interrupt();
                }
    
                boolean success = false;
                // 更新最近执行时间
                updateLastExecutionTime();
                try {
                    // 若使用this.run(),会调用到当前Runnable对象的run方法,造成无限递归调用,导致栈溢出异常
                    // 使用SingleThreadEventExecutor.this.run()明确调用SingleThreadEventExecutor对象的run方法
                    // run是抽象方法,由子类实现。
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally { // run方法执行结束,当前事件执行器同样结束
                    // for循环确保状态流转成功
                    for (;;) {
                        int oldState = state;
                        // state转成ST_SHUTTING_DOWN
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
    
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }
    
                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        for (;;) {
                            // 清空三个任务队列:tailTasks、taskQueue、scheduledTaskQueue
                            if (confirmShutdown()) {
                                break;
                            }
                        }
    
                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        // state状态流转为ST_SHUTDOWN
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }
    
                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        // 再调用一次,清空队列中剩余任务
                        confirmShutdown();
                    } finally {
                        try {
                            // STEE是空方法,NioEventLoop重写方法,将selector关闭
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            // 清空ThreadLocal本地变量,防止内存溢出风险
                            FastThreadLocal.removeAll();
    
                            // state状态流转为ST_TERMINATED。本质上还是putIntVolatile方法原子性修改状态值
                            // 为啥不使用CAS操作?此状态是最后一个状态值,无论旧值是啥,直接设置成ST_TERMINATED即可,所以CAS没有必要
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            // threadLock是CountDownLatch对象,若调用了awaitTermination,当前执行器的线程会被阻塞
                            // 唤醒等待的线程
                            threadLock.countDown();
                            // 丢弃taskQueue中的任务,记录任务数,打印日志
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            // terminationFuture设置成功,触发监听器
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    run方法是子类实现的,那么子类是NioEventLoop

    protected void run() {
        // 记录调用select的次数
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    // selectStrategy默认是DefaultSelectStrategy类,判断taskQueue或tailTasks是否有任务,若有执行selectNowSupplier.get()方法
                    // 此方法返回selectNow(),也即调用selector.selectNow(),epoll_wait不阻塞立即返回。
                    // 若没有任务,就返回SelectStrategy.SELECT
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE: // 跳过
                        continue;
    
                    case SelectStrategy.BUSY_WAIT: // 繁忙等待,无实现
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT: // 阻塞等待一定时间
                        // 获取scheduledTaskQueue下一个任务,得到其截至时间
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        // 若没有任务,返回-1
                        if (curDeadlineNanos == -1L) {
                            // 任务基本都是事件处理过程中添加的,若没有任务说明没有事件,可以无限期等待
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        // 设置下一次唤醒时间
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 确保taskQueue和tailTasks中没有任务,否则不会让select进入阻塞,下个轮次再执行select
                            if (!hasTasks()) {
                                // 若没有任务,调用select进入阻塞。若一直阻塞怎么办?其他线程给当前EventLoop添加任务时会唤醒epoll中断返回
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            // 表明已经唤醒,设置AWAKE。单线程运行,不会有多线程竞争nextWakeupNanos的情况。
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    // select出现异常,获取新的selector
                    rebuildSelector0();
                    // 重置次数清零
                    selectCnt = 0;
                    // Thread.sleep(1000)让线程暂停一段时间,避免for循环连续操作select时异常依然存在,导致CPU资源被过多占用
                    handleLoopException(e);
                    continue;
                }
    
                // select轮次增加
                selectCnt++;
                // 新轮次将cancelledKeys清空
                cancelledKeys = 0;
                needsToSelectAgain = false;
                // 表示 I/O 操作和非 I/O 操作之间比例的参数
                final int ioRatio = this.ioRatio;
                // 是否已经运行任务
                boolean ranTasks;
            	// 若ioRatio为100,表示所有时间片都给I/O操作
                if (ioRatio == 100) {
                    try {
                        // strategy是已经准备好的事件数
                        if (strategy > 0) {
                            // 处理已经准备的事件
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        // 若按照ioRatio的定义,所有时间片都给IO操作,此处不会调用。那么非IO操作一直无法执行。
                        // 但是一直执行task,任务队列始终都有任务,IO操作就无法处理。所以非必要不讲ratio设置为true
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) { // ioRatio非100,且有准备好的事件
                    // 记录IO操作开始时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        // 处理IO操作
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 计算IO操作处理时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 通过IO操作时间片和ioRatio计算非IO操作运行执行时间
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    // 无IO操作,只执行最小的任务数
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
    
                if (ranTasks || strategy > 0) {
                    // 若未执行任务,且准备好的事件数为0,且次数已经达到三次,记录日志
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    // 轮次清零
                    selectCnt = 0;
                // 当select过早返回的次数达到一定阈值时,会重建selector对象,清楚无效的key,提高selector执行IO操作的效率
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) { // SelectionKey被取消
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    // 若处于关闭状态,关闭selector
                    if (isShuttingDown()) {
                        closeAll();
                        // 将任务队列剩下的任务执行完
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137

    有个疑问:调用select如果一直阻塞怎么办?其他线程调用EventLoop的execute添加任务时,会顺便调用selector.select唤醒epoll返回。
    Netty (4).png
    run方法代码比较多,但核心代码就那些,更多的是异常处理。

    • 选择合适的策略,执行selector.select()处理IO操作
    • 根据ioRatio分配处理IO操作和非IO操作的时间片

    我们先着重看下IO操作处理过程
    SelectorImpl(Linux系统是EPollSelectorImpl)用HashSet存放epoll返回的事件集。HashSet本质上用数组存储元素,通过hash计算索引位置插入元素(HashSet维护HashMap,HashMap本质上就是数组)。哈希不冲突时添加和删除的时间复杂度都是O(1),但出现哈希冲突时复杂度变差,最坏情况下为O(N)也即需要遍历数组。Netty追求高性能,觉得可以对其优化。

    • 自定义Set类(SelectedSelectionKeySet)继承AbstractSet实现增删改查方法
    • 只使用数组,然后给定较长的长度(1024),按顺序插入元素(SelectionKey),删除不做操作,这样就能避免删除时需要遍历数据的情况
    • 替换EPollSelectorImpl中的selectedKeys和publicSelectedKeys集合类型,默认是HashSet类

    如果SelectionKey被cancel,但Set未删除,在处理key会不会有问题?cancel时会把SelectionKey的valid标识设置为false,处理key时跳过key即可。
    结合优化操作,再看run方法中的代码逻辑有更深一层的理解

    • 为何unexpectedSelectorWakeup中判断selectCnt达到512次时,就重建selector?server端就一个SelectionKey:accept,client端要注册SelectionKey的就两个SelectionKey:read和writ。一个channel中selectedKeys默认为1024长度(可以扩增),最差的情况是512轮次反复注册和取消两个key,Set会被塞满。可以清空Set,但selector也有一些集合,没法给他们清空。所以Netty的操作是重建selector
    private void processSelectedKeys() {
        // SelectedSelectionKeySet对象,优化的Set集合,若此对象不为空,说明可以遍历优化的集合处理
        if (selectedKeys != null) {
            // 遍历优化的Set集合处理IO事件
            processSelectedKeysOptimized();
        } else {
            // 使用正常的方式处理事件,获取selector内置的Set集合(HashSet对象)
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    private void processSelectedKeysOptimized() {
        // 遍历自定义的SelectedSelectionKeySet
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            // help GC
            selectedKeys.keys[i] = null;
    
            // SelectionKey允许存放一个Object对象,好处在于不需要使用Map维护SelectionKey与附件的关系
            final Object a = k.attachment();
    
            // channel是Java原生的(SelectableChannel)还是 Netty自定义的(AbstractNioChannel)
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            // 当取消的SelectionKey达到256个时,needsToSelectAgain设为true
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                // 未遍历的全部清空
                selectedKeys.reset(i + 1);
    
                // 重新调用selector.select()获取事件集
                selectAgain();
                // 从头开始遍历
                i = -1;
            }
        }
    }
    
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        // Set为空时,调用iterator方法会创建Iterator对象,但此对象无用,增加内存和垃圾回收的负担,所以要先判断是否为空
        if (selectedKeys.isEmpty()) {
            return;
        }
    
        // 获取Iterator对象
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            // 删除key是为了避免重复处理。若使用for循环遍历集合,再做删除操作会出现ConcurrentModificationException异常
            i.remove();
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            // 没有key跳出循环
            if (!i.hasNext()) {
                break;
            }
    
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
    
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    // 无法从头遍历,需要重新创建Iterator对象
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    NioTask是给开发者扩展的类,这里不阐述此类情况。只详细阐述AbstractNioChannel的事件如何处理?对事件分发

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 获取channel绑定的Unsafe对象
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                // 获取channel绑定的EventLoop对象
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            // 事件被取消,直接跳过
            return;
        }
    
        try {
            // 获取准备好的事件
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            // 是否有CONNECT事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                // 由于已经完成连接,此事件只会触发一次,又由于select在不阻塞时一直返回null,所以要删除此事件
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                // 完成connect后续操作
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                unsafe.forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            // server端是ACCEPT事件,client端是READ事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    上面是IO操作的处理过程,非IO操作的过程如下,我们只看有超时时间的方法

    protected boolean runAllTasks(long timeoutNanos) {
    	// 将schedule队列中已经超时的任务放入到taskQueue队列中
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
    	// 若任务为空,直接执行tailTasks中的任务
        if (task == null) {
            afterRunningAllTasks();
            // 返回false说明没有执行全部的任务队列
            return false;
        }
    
    	// 计算截止时间
        final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            // 执行任务
            safeExecute(task);
    
            runTasks ++;
    
            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            // 每执行64个任务判断是否到了截止时间,若已经超时就结束执行。为啥这样操作?因为频繁调用nanoTime()影响性能
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = getCurrentTimeNanos();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
    
            // 从taskQueue中取出任务
            task = pollTask();
            // 任务为空结束执行
            if (task == null) {
                lastExecutionTime = getCurrentTimeNanos();
                break;
            }
        }
    
    	// 执行tailTasks中的任务
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
    	// 返回true说明执行了全部队列中的任务
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    小结:事件循环就是循环调用select并处理事件集

    • 执行线程给EventLoop添加任务时判断是否在事件循环中。若不在,再判断EventLoop的state是否为未启动。若未启动,执行executor.execute(executor是juc的Executor对象)启动新线程或线程池执行事件循环任务
    • 任务执行时先将当前执行线程与EventLoop绑定,执行NioEventLoop的run方法
    • 选择不同的策略执行selector.select方法
      • taskQueue和tailTasks两个任务队列有任务时,执行selector.selectNow(),不阻塞立即返回
      • 两个队列没有任务时,取一个scheduledTaskQueue任务队列中任务,此任务是延迟执行时间最小的任务,与当前时间计算时间差,用作select的阻塞等待时间
      • 三个队列都没有任务时,select无限期阻塞等待
    • 若有任务进来(execute添加任务)时,会执行selector.wakeup唤醒epoll,select结束阻塞
    • 使用ioRatio(默认50)控制IO操作(processSelectedKeys)和非IO操作(runAllTasks)的时间片。IO操作必须要执行完才行,非IO操作按照给定的时间片执行。
      • 若ioRatio为100时,两个操作无时间限制。若非IO操作任务太多,IO操作就会有延迟
      • 若select返回事件数大于0,且ioRatio不为100时,计算IO操作执行时间(ioTime),再计算非IO操作执行的时间。non-ioTime = ioTime * (100 - ioRatio) / ioRatio。假设ioRatio为30,non-ioTime= ioTime * (7/3)
      • 若select返回事件数为0,且ioRatio不为100时,非IO操作执行的时间为0
    • IO事件处理时,根据是否做了优化来触发不同的流程
      • 若做了优化,使用SelectedSelectionKeySet对象替换EPollSelectorImpl中selectedKeys和publicSelectedKeys集合,自定义的Set使用数组,只有插入操作。虽然SelectionKey被cancel时,selectedKeys不会删除此key,但SelectionKey中的valid字段被设为false。在遍历数组时,判断valid为false,说明此key无效,跳过即可。然后对有效的key进行事件分发操作。
      • 若未做优化,使用selector原生的Set对象(HashSet),遍历事件集,对事件进行分发。
    • 非IO操作处理时,根据给定的时间片执行任务
      • 将scheduledTaskQueue已经到延迟时间的任务全部放入到taskQueue。若taskQueue已经放不下,放回到schedule队列
      • 执行taskQueue中的任务,每执行64个任务计算执行时间,若超过给定的时间片就退出,否则继续执行。
      • 执行tailTasks中的任务

    Netty (5).png

  • 相关阅读:
    Python中ArcPy按照分幅条带与成像日期拼接每个8天间隔内的遥感影像
    zabbix自定义模板,邮件报警,代理服务器,自动发现与自动添加及snmp
    Optional 详解
    源码解读之FutureTask如何实现最大等待时间
    CDR和AI哪个软件更好用?
    【高性能计算】CUDA
    clickhouse学习之路----clickhouse的特点及安装
    Elastic Stack 环境配置与框架简介
    Linux 运维工程师面试真题-3-Linux 磁盘及软件管理操作
    黎曼几何与切空间之间的投影
  • 原文地址:https://blog.csdn.net/HenryHuangJabil/article/details/132764894