• Netty源码解析一——线程池模型之线程池NioEventLoopGroup


    本文基础是需要有Netty的使用经验,如果没有编码经验,可以参考官网给的例子:https://netty.io/wiki/user-guide-for-4.x.html。另外本文也是针对的是Netty 4.1.x版本的。

    Reactor模式

    本文主要介绍Netty线程模型及其实现,介绍Netty线程模型前,首先会介绍下经典的Reactor线程模型,目前大多数网络框架都是基于Reactor模式进行设计和开发,Reactor模式基于事件驱动,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers。这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler,非常适合处理海量的I/O事件。下面简单介绍下Reactor模式及其线程模型。

    单线程模型

    如图所示,由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞。通常Reactor线程中聚合了多路复用器负责监听网络事件,当有新连接到来时,触发连接事件,Disdatcher负责使用Acceptor接受客户端连接,建立通信链路。当I/O事件就绪后,Disdatcher负责将事件分发到对应的event handler上负责处理。

    该模型的缺点很明显,不适用于高负载、高并发的应用场景;由于只有一个Reactor线程,一旦故障,整个系统通信模块将不可用。

    多线程模型

    该模型的特点:

    • 专门由一个Reactor线程-Acceptor线程用于监听服务端,接收客户端连接请求;
    • 网络I/O操作读、写等由Reactor线程池负责处理;
    • 一个Reactor线程可同时处理多条链路,但一条链路只能对应一个Reactor线程,这样可避免并发操作问题。

    绝大多数场景下,Reactor多线程模型都可以满足性能需求,但是,在极个别特殊场景中,一个Reactor线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。因此,诞生了第三种线程模型。

    主从多线程模型

    该模型的特点:

    • 服务端使用一个独立的主Reactor线程池来处理客户端连接,当服务端收到连接请求时,从主线程池中随机选择一个Reactor线程作为Acceptor线程处理连接;
    • 链路建立成功后,将新创建的SocketChannel注册到sub reactor线程池的某个Reactor线程上,由它处理后续的I/O操作。

    Netty线程模型

    Netty同时支持Reactor单线程模型 、Reactor多线程模型和Reactor主从多线程模型,用户可根据启动参数配置在这三种模型之间切换。Netty线程模型原理图如下:

    服务端启动时,通常会创建两个NioEventLoopGroup实例,对应了两个独立的Reactor线程池。常见服务端启动代码实现如下:

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                   ......

    实际上比较重要的创建线程池创建代码:

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    服务端启动时创建了两个NioEventLoopGroup,他们实际上时两个独立的Reactor线程池,一个负责接收客户端的TCP连接,另一个用于处理I/O操作,或执行系统Task、定时任务Task等,接下来做一下源码分析。

     NioEventLoopGroup

    首先看下NioEventLoopGroup的继承关系:

    可以看出最终还是调用了java的线程池创建方式,接下来看一下它的构造方法。

    查看代码
    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    
     
        public NioEventLoopGroup() {
            this(0);
        }
    
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
        }
    
        public NioEventLoopGroup(ThreadFactory threadFactory) {
            this(0, threadFactory, SelectorProvider.provider());
        }
    
      
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor) {
            this(nThreads, executor, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(
                int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
            this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
            final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
            this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler,
                                 final EventLoopTaskQueueFactory taskQueueFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    rejectedExecutionHandler, taskQueueFactory);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 SelectorProvider selectorProvider,
                                 SelectStrategyFactory selectStrategyFactory,
                                 RejectedExecutionHandler rejectedExecutionHandler,
                                 EventLoopTaskQueueFactory taskQueueFactory,
                                 EventLoopTaskQueueFactory tailTaskQueueFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }
    
    }
    

    可以看到最终是调用了父类MultithreadEventExecutorGroup的构造方法,继续跟踪:

    查看代码
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
        }
    
     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    
     
      protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            checkPositive(nThreads, "nThreads");
    
            if (executor == null) {
                //创建线程执行器以及线程工厂
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            //根据线程数构建EventExecutor数组
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    //初始化数组中的线程,由NioEventLoopGroup创建NioEventLoop实例
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    //当初始化失败时,需要优雅关闭,清理资源
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                //当线程未终止时,等待终止
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            //根据线程数创建选择器,选择器主要适用于next()方法
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            //为每个EventLoop线程增加线程终止监听器
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            //创建执行器数组只读副本,便于在迭代查询时使用
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }

    从代码看出,使用EventLoopGroup workerGroup = new NioEventLoopGroup()来创建线程池,如果不指定线程个数,那么默认用0,在默认为0的情况下系统会使用默认的线程个数来创建线程池,如果制定了n>0个线程个数的话,就创建有限个数线程的线程池。那么默认创建的线程个数规则是啥呢?可以详见如下代码:

     static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }

    可以看出是在CPU核心数*2与“io.netty.eventLoopThreads”这个配置参数取或并且与1比较大小得到的结果,就说如果“io.netty.eventLoopThreads”这系统参数配置了就用系统参数与1比较的最大值返回,如果没有配置使用cpu核心数*2与1比较的最大值返回。

    关于NioEventLoopGroup我们从代码跟踪中做如下的总结,该类主要是完成三件事:

    1. 创建一定数量的NioEventLoop线程组并初始化。
    2. 创建线程选择器chooser,当获取线程时,通过选择器来获取。
    3. 创建线程工厂并构建线程执行器。

    线程组的生产分两步:第一步,创建一定数量的EventExecutor数组;第二步,通过调用子类的newChild()方法完成这些EventExecutor数组的初始化。为了提高可扩展性,Netty的线程组除了NioEventLoopGroup,还有Netty通过JNI方式提供的一套由epoll模型实现的EpollEventLoop Group线程组,以及其他I/O多路复用模型线程组,因此newChild()方法由具体的线程组子类来实现。

    children[i] = newChild(executor, args);
    查看代码
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            SelectorProvider selectorProvider = (SelectorProvider) args[0];
            SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
            RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
            EventLoopTaskQueueFactory taskQueueFactory = null;
            EventLoopTaskQueueFactory tailTaskQueueFactory = null;
    
            int argsLength = args.length;
            if (argsLength > 3) {
                taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
            }
            if (argsLength > 4) {
                tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
            }
            return new NioEventLoop(this, executor, selectorProvider,
                    selectStrategyFactory.newSelectStrategy(),
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }

    在newChild()方法中,NioEventLoop的初始化参数有6个:第1个参数为NioEventLoopGroup线程组本身;第2个参数为线程执行器,用于启动线程,在SingleThreadEventExecutor的doStartThread()方法中被调用;第3个参数为NIO的Selector选择器的提供者;第4个参数主要在NioEventLoop的run()方法中用于控制选择循环;第5个参数为非I/O任务提交被拒绝时的处理Handler;第6个参数为队列工厂,在NioEventLoop中,队列读是单线程操作,而队列写则可能是多线程操作,使用支持多生产者、单消费者的队列比较合适,默认为MpscChunkedArrayQueue队列。

    NioEventLoopGroup通过next()方法获取NioEventLoop线程,最终会调用其父类MultithreadEventExecutorGroup的next()方法,委托父类的选择器EventExecutorChooser。具体使用哪种选择器对象取决于MultithreadEventExecutorGroup的构造方法中使用的策略模式。

    根据线程条数是否为2的幂次来选择策略,若是,则选择器为PowerOfTwoEventExecutorChooser,其选择策略使用与运算计算下一个选择的线程组的下标index;若不是,则选择器为GenericEventExecutorChooser,其选择策略为使用求余的方法计算下一个线程在线程组中的下标index。其中,PowerOfTwoEventExecutorChooser选择器的与运算性能会更好。

    根据线程条数是否为2的幂次来选择策略,若是,则选择器为PowerOfTwoEventExecutorChooser,其选择策略使用与运算计算下一个选择的线程组的下标index,此计算方法在第7章中也有相似的应用;若不是,则选择器为GenericEventExecutorChooser,其选择策略为使用求余的方法计算下一个线程在线程组中的下标index。其中,PowerOfTwoEventExecutorChooser选择器的与运算性能会更好。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    由于Netty的NioEventLoop线程被包装成了FastThreadLocalThread线程,同时,NioEventLoop线程的状态由它自身管理,因此每个NioEventLoop线程都需要有一个线程执行器,并且在线程执行前需要通过线程工厂io.netty.util.concurrent.DefaultThreadFactory将其包装成FastThreadLocalThread线程。线程执行器ThreadPerTaskExecutor与DefaultThreadFactory的newThread()方法的代码解读如下:

    查看代码
    
    public void execute(Runnable command) {
            //调用线程工厂类的newThread包装线程,并且启动,等待线程调度。
            threadFactory.newThread(command).start();
        }
    
    
    public Thread newThread(Runnable r) {
            //包装FastThreadLocalThread线程,线程的前缀名字为NioEventLoopGroup-
            //服务启动后可以通过Arthas工具查看
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            try {
                if (t.isDaemon() != daemon) {
                    t.setDaemon(daemon);
                }
    
                if (t.getPriority() != priority) {
                    t.setPriority(priority);
                }
            } catch (Exception ignored) {
                // Doesn't matter even if failed to set.
            }
            return t;
        }
    
        //包装为FastThreadLocalThread线程
        protected Thread newThread(Runnable r, String name) {
            return new FastThreadLocalThread(threadGroup, r, name);
        }
    

     

  • 相关阅读:
    web前端设计与开发期末作品 用DIV CSS技术设计的网上书城网页与实现制作(大一Web课程设计)
    Mybatis使用注解实现复杂动态SQL
    PHP如何对二维数组(多维数组)进排序
    基于Qt mqtt库的客户端实现
    千里马常有而伯乐不常有啊
    Anchor-free
    期末前端web大作业——我的家乡陕西介绍网页制作源码HTML+CSS+JavaScript
    FPGA ——Verilog语法示例
    【用unity实现100个游戏之13】复刻类泰瑞利亚生存建造游戏——包括建造系统和库存系统
    SpringMVC框架
  • 原文地址:https://www.cnblogs.com/zengdan-develpoer/p/15918986.html