• 【Netty源码系列(二)】解读EventLoopGroup


    【Netty源码系列文章中源码出自4.1.84.Final版本】


    本篇文章主要看一下 EventLoopGroup的源码,了解一下它的创建过程。

    	EventLoopGroup bossGroup = new NioEventLoopGroup(2);
    	EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    • 1
    • 2

    1. EventLoopGroup接口类

    我们先进入EventLoopGroup类中,看一下这个类的情况。有关这个类的信息都在源码中注释了

    /**
     * 这是一个接口类,继承自EventExecutorGroup 
     * 它的作用就是允许将 事件循环中每个加工过的Channel对象注册进来
     */
    public interface EventLoopGroup extends EventExecutorGroup {
        /**
         * 枚举返回下一个EventLoop对象
         */
        @Override
        EventLoop next();
    
        /**
         * 使用EventLoop注册Channel,当注册完成时,返回通知对象ChannelFuture 
         */
        ChannelFuture register(Channel channel);
    
        /**
         * 使用EventLoop注册Channel,一旦注册完成,也会返回通知对象ChannelFuture,但是这里传参变为ChannelPromise,
         * 这个对象内部也维护了一个Channel对象
         */
        ChannelFuture register(ChannelPromise promise);
    
        /**
         * 这个方法已经弃用,上面的方法已经包含这个方法功能
         */
        @Deprecated
        ChannelFuture register(Channel channel, ChannelPromise promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    ChannelFuture接口类,异步返回Channel的I/O结果。

    2. NioEventLoopGroup创建过程

    new NioEventLoopGroup(2)
    
    • 1

    这里传入线程数量参数,就会根据传入的值进行创建,不传时,默认按系统cpu的核数*2进行创建。

    进入NioEventLoopGroup类中,一直debug,可以看到如下图所示的过程。
    在这里插入图片描述
    此时,我们进入super(...)这个方法中,进入到MultithreadEventLoopGroup类里面。

    	 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    	     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    	 }
    
    • 1
    • 2
    • 3

    这时,我们看到一个三目运算,而DEFAULT_EVENT_LOOP_THREADS的值,在这个类加载时,就已经进行赋值操作,正是获取系统可用cpu核数的2倍,如下。

        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    我们接着进入super(...)方法中,可以看到下面的this(...)方法,看一下注释,说创建一个新的实例,我们再点击进入到真正的方法中MultithreadEventExecutorGroup(...)

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            //检查参数nThreads
            checkPositive(nThreads, "nThreads");
    		//创建executor对象
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    		//创建EventExecutor数组
            children = new EventExecutor[nThreads];
    		//枚举将生成的EventExecutor对象放入数组中
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                	//这里是核心,用来生成EventExecutor对象,后面会讲
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    		//枚举设置监听器
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    		//将EventExecutor对象放入一个只读集合中
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    这个方法干了这几件事,见下图。我们先整体有个了解,再对其中几个重要的步骤进行深入。

    在这里插入图片描述

    2.1 Executor实现机制

            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
    • 1
    • 2
    • 3

    我们知道,之前参数executor传入的是null值,在这里会进行创建。

    newDefaultThreadFactory() 创建默认线程工厂,并为线程设置一些属性,我们进入这个方法中。

    	//MultithreadEventExecutorGroup类
    
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    下一步:

    	//DefaultThreadFactory类,实现ThreadFactory接口
    
        public DefaultThreadFactory(Class<?> poolType) {
            this(poolType, false, Thread.NORM_PRIORITY);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里新增两个参数,
    daemon:守护进程,默认传false
    priority:优先级,默认传的是5,即Thread.NORM_PRIORITY

    下一步:

        public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
            this(toPoolName(poolType), daemon, priority);
        }
    
    • 1
    • 2
    • 3

    toPoolName(poolType)方法,是获取对应的池名称。

    下一步:

        public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
            this(poolName, daemon, priority, null);
        }
    
    • 1
    • 2
    • 3

    新增一个参数,
    ThreadGroup:线程组,默认传null

    下一步:

        public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        	//检查池名称不为null,否则抛异常
            ObjectUtil.checkNotNull(poolName, "poolName");
    		//检查优先级范围
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                throw new IllegalArgumentException(
                        "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
            }
    		//poolId,是AtomicInteger原子对象,用来拼接线程名称前缀
    		//后面对三个属性进行赋值
            prefix = poolName + '-' + poolId.incrementAndGet() + '-';
            this.daemon = daemon;
            this.priority = priority;
            this.threadGroup = threadGroup;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    到这里,只是对线程工厂的线程进行属性的设置;

    回到开始的地方:

    	executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    
    • 1

    我们进入ThreadPerTaskExecutor这个类中,看一下。

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    这个类里,只是把刚刚设置好的线程工厂对象ThreadFactory赋值给ThreadPerTaskExecutor中的属性,这里并没有去调用execute()方法;

    真正调用执行的是这个方法:newChild(executor, args)

    	//MultithreadEventExecutorGroup类
    
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
    		...
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                	//真正去调用执行的方法
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                }
                ...
         }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.2 EventLoop对象创建(newChild()方法)

    我们点击newChild()方法,发现这个一个抽象方法,这里我们看一下它的实现方法,因为创建的是NioEventLoopGroup对象,所以我们选择NioEventLoopGroup这个类的重写方法,点进去看一下:

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            ...
            return new NioEventLoop(this, executor, selectorProvider,
                    selectStrategyFactory.newSelectStrategy(),
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里对之前传入的多个参数,根据不同的参数类型,进行强制转换;然后将转换后的参数传递到NioEventLoop类的构造方法中,创建一个NioEventLoop对象。我们进入它的构造方法中,看一下:

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
            super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                    rejectedExecutionHandler);
            this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
            this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
            final SelectorTuple selectorTuple = openSelector();
            this.selector = selectorTuple.selector;
            this.unwrappedSelector = selectorTuple.unwrappedSelector;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    NioEventLoop构造方法中又调用了它的父类方法,校验参数,获取选择器并赋值。我们进入super(...)方法中。

        protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                        RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
            tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里也是干了同样的事情,调用父类方法,校验参数。我们再点击super()方法。

        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
            //重点!!!
            this.executor = ThreadExecutorMap.apply(executor, this);
            this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
            this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    到了这一步,我们会发现不一样了,又出现executor这个对象了,而且通过ThreadExecutorMap.apply(executor, this)这个方法又重新赋值给Executor,我们前面说过,之前创建Executor的时候,只是设置了一些属性参数,并没有真正去调用创建线程的方法,现在这里又出现了,会是这里吗?我们点击apply()方法进去看看。

        /**
         * Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
         * when called from within the {@link Runnable} during execution.
         */
        public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(executor, "executor");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            return new Executor() {
                @Override
                public void execute(final Runnable command) {
                    executor.execute(apply(command, eventExecutor));
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们看到这里调用execute()这个方法,apply(command, eventExecutor)这个方法通过command.run()生成Runnable对象,然后执行器去调用。

    到这里我们大体了解EventLoopGroup的整个创建过程,本文有误的地方,烦请留言指正,谢谢!

  • 相关阅读:
    vue/html input 读取 json数据
    Flutter简易弹窗
    第3章 基础项目的搭建
    Laravel-admin弹出提示层的三种方法
    博客园又崩了,这个锅要不要阿里云背?
    十二、【VUE基础】列表渲染
    获取Linux内核源码
    PDF自动打印
    力扣27-移除元素——简单题
    Eureka的设计理念
  • 原文地址:https://blog.csdn.net/weixin_43407520/article/details/127787088