本系列Netty源码解析文章基于 4.1.56.Final版本
在上篇文章深入讲解Netty那些事儿之从内核角度看IO模型中我们花了大量的篇幅来从内核角度详细讲述了五种IO模型
的演进过程以及ReactorIO线程模型
的底层基石IO多路复用技术在内核中的实现原理。
最后我们引出了netty中使用的主从Reactor IO线程模型。
netty中的reactor
通过上篇文章的介绍,我们已经清楚了在IO调用的过程中内核帮我们搞了哪些事情,那么俗话说的好内核领进门,修行在netty
,netty在用户空间又帮我们搞了哪些事情?
那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型
在Netty中是如何实现的。
本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创建出来的。
在上篇文章中我们提到Netty采用的是主从Reactor多线程
的模型,但是它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型
有所差异。
经典主从Reactor多线程模型
Netty中的Reactor
是以Group
的形式出现的,主从Reactor
在Netty中就是主从Reactor组
,每个Reactor Group
中会有多个Reactor
用来执行具体的IO任务
。当然在netty中Reactor
不只用来执行IO任务
,这个我们后面再说。
Main Reactor Group
中的Reactor
数量取决于服务端要监听的端口个数,通常我们的服务端程序只会监听一个端口,所以Main Reactor Group
只会有一个Main Reactor
线程来处理最重要的事情:绑定端口地址
,接收客户端连接
,为客户端创建对应的SocketChannel
,将客户端SocketChannel分配给一个固定的Sub Reactor
。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。“我家大门常打开,开放怀抱等你,拥抱过就有了默契你会爱上这里......”
Sub Reactor Group
里有多个Reactor
线程,Reactor
线程的个数可以通过系统参数-D io.netty.eventLoopThreads
指定。默认的Reactor
的个数为CPU核数 * 2
。Sub Reactor
线程主要用来轮询客户端SocketChannel上的IO就绪事件
,处理IO就绪事件
,执行异步任务
。Sub Reactor Group
做的事情就是上篇饭店例子中服务员的工作,客人进来了要为客人分配座位,端茶送水,做菜上菜。“不管远近都是客人,请不用客气,相约好了在一起,我们欢迎您......”
一个
客户端SocketChannel
只能分配给一个固定的Sub Reactor
。一个Sub Reactor
负责处理多个客户端SocketChannel
,这样可以将服务端承载的全量客户端连接
分摊到多个Sub Reactor
中处理,同时也能保证客户端SocketChannel上的IO处理的线程安全性
。
由于文章篇幅的关系,作为Reactor在netty中实现的第一篇我们主要来介绍主从Reactor Group
的创建流程,骨架脉络先搭好。
下面我们来看一段Netty服务端代码的编写模板,从代码模板的流程中我们来解析下主从Reactor的创建流程以及在这个过程中所涉及到的Netty核心类。
资料直通车:Linux内核源码技术学习路线+视频教程内核源码
- /**
- * Echoes back any received data from a client.
- */
- public final class EchoServer {
- static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
-
- public static void main(String[] args) throws Exception {
- // Configure the server.
- //创建主从Reactor线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- final EchoServerHandler serverHandler = new EchoServerHandler();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)//配置主从Reactor
- .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
- .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
- .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
- .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- //p.addLast(new LoggingHandler(LogLevel.INFO));
- p.addLast(serverHandler);
- }
- });
-
- // Start the server. 绑定端口启动服务,开始监听accept事件
- ChannelFuture f = b.bind(PORT).sync();
- // Wait until the server socket is closed.
- f.channel().closeFuture().sync();
- } finally {
- // Shut down all event loops to terminate all threads.
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
-
-
首先我们要创建Netty最核心的部分 -> 创建主从Reactor Group
,在Netty中EventLoopGroup
就是Reactor Group
的实现类。对应的EventLoop
就是Reactor
的实现类。
- //创建主从Reactor线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
创建用于IO处理
的ChannelHandler
,实现相应IO事件
的回调函数,编写对应的IO处理
逻辑。注意这里只是简单示例哈,详细的IO事件处理,笔者会单独开一篇文章专门讲述。
- final EchoServerHandler serverHandler = new EchoServerHandler();
-
- /**
- * Handler implementation for the echo server.
- */
- @Sharable
- public class EchoServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ................省略IO处理逻辑................
- ctx.write(msg);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
-
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- // Close the connection when an exception is raised.
- cause.printStackTrace();
- ctx.close();
- }
- }
创建ServerBootstrap
Netty服务端启动类,并在启动类中配置启动Netty服务端所需要的一些必备信息。
在上篇文章介绍
Socket内核结构
小节中我们提到,在编写服务端网络程序时,我们首先要创建一个Socket
用于listen和bind
端口地址,我们把这个叫做监听Socket
,这里对应的就是NioServerSocketChannel.class
。当客户端连接完成三次握手,系统调用accept
函数会基于监听Socket
创建出来一个新的Socket
专门用于与客户端之间的网络通信我们称为客户端连接Socket
,这里对应的就是NioSocketChannel.class
netty有两种
Channel类型
:一种是服务端用于监听绑定端口地址的NioServerSocketChannel
,一种是用于客户端通信的NioSocketChannel
。每种Channel类型实例
都会对应一个PipeLine
用于编排对应channel实例
上的IO事件处理逻辑。PipeLine
中组织的就是ChannelHandler
用于编写特定的IO处理逻辑。
注意
serverBootstrap.handler
设置的是服务端NioServerSocketChannel PipeLine
中的ChannelHandler
。
ServerBootstrap
启动类方法带有child
前缀的均是设置客户端NioSocketChannel
属性的。
ChannelInitializer
是用于当SocketChannel
成功注册到绑定的Reactor
上后,用于初始化该SocketChannel
的Pipeline
。它的initChannel
方法会在注册成功后执行。这里只是捎带提一下,让大家有个初步印象,后面我会专门介绍。
serverBootstrap.childHandler(ChannelHandler childHandler)
用于设置客户端NioSocketChannel
中对应Pipieline
中的ChannelHandler
。我们通常配置的编码解码器就是在这里。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
设置服务端ServerSocketChannel
中的SocketOption
。关于SocketOption
的选项我们后边的文章再聊,本文主要聚焦在Netty Main Reactor Group
的创建及工作流程。
serverBootstrap.handler(....)
设置服务端NioServerSocketChannel
中对应Pipieline
中的ChannelHandler
。
通过serverBootstrap.group(bossGroup, workerGroup)
为Netty服务端配置主从Reactor Group
实例。
通过serverBootstrap.channel(NioServerSocketChannel.class)
配置Netty服务端的ServerSocketChannel
用于绑定端口地址
以及创建客户端SocketChannel
。Netty中的NioServerSocketChannel.class
就是对JDK NIO中ServerSocketChannel
的封装。而用于表示客户端连接
的NioSocketChannel
是对JDK NIO SocketChannel
封装。
ChannelFuture f = serverBootstrap.bind(PORT).sync()
这一步会是下篇文章要重点分析的主题Main Reactor Group
的启动,绑定端口地址,开始监听客户端连接事件(OP_ACCEPT
)。本文我们只关注创建流程。
f.channel().closeFuture().sync()
等待服务端NioServerSocketChannel
关闭。Netty服务端到这里正式启动,并准备好接受客户端连接的准备。
shutdownGracefully
优雅关闭主从Reactor线程组
里的所有Reactor线程
。
在上篇文章中我们介绍了五种IO模型
,Netty中支持BIO
,NIO
,AIO
以及多种操作系统下的IO多路复用技术
实现。
在Netty中切换这几种IO模型
也是非常的方便,下面我们来看下Netty如何对这几种IO模型进行支持的。
首先我们介绍下几个与IO模型
相关的重要接口:
EventLoop
就是Netty中的Reactor
,可以说它就是Netty的引擎,负责Channel上IO就绪事件的监听
,IO就绪事件的处理
,异步任务的执行
驱动着整个Netty的运转。
不同IO模型
下,EventLoop
有着不同的实现,我们只需要切换不同的实现类就可以完成对NettyIO模型
的切换。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型
下Netty会自动
根据操作系统以及版本的不同选择对应的IO多路复用技术实现
。比如Linux 2.6版本以上用的是Epoll
,2.6版本以下用的是Poll
,Mac下采用的是Kqueue
。
其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。
Netty中的Reactor
是以Group
的形式出现的,EventLoopGroup
正是Reactor组
的接口定义,负责管理Reactor
,Netty中的Channel
就是通过EventLoopGroup
注册到具体的Reactor
上的。
Netty的IO线程模型是主从Reactor多线程模型
,主从Reactor线程组
在Netty源码中对应的其实就是两个EventLoopGroup
实例。
不同的IO模型
也有对应的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
用于Netty服务端使用的ServerSocketChannel
,对应于上篇文章提到的监听Socket
,负责绑定监听端口地址,接收客户端连接并创建用于与客户端通信的SocketChannel
。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
用于与客户端通信的SocketChannel
,对应于上篇文章提到的客户端连接Socket
,当客户端完成三次握手后,由系统调用accept
函数根据监听Socket
创建。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
我们看到在不同IO模型
的实现中,Netty这些围绕IO模型
的核心类只是前缀的不同:
BIO对应的前缀为Oio
表示old io
,现在已经废弃不推荐使用。
NIO对应的前缀为Nio
,正是Netty推荐也是我们常用的非阻塞IO模型
。
AIO对应的前缀为Aio
,由于Linux下的异步IO
机制实现的并不成熟,性能提升表现上也不明显,现已被删除。
我们只需要将IO模型
的这些核心接口对应的实现类前缀
改为对应IO模型
的前缀,就可以轻松在Netty中完成对IO模型
的切换。
image.png
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
我们通常在使用NIO模型
的时候会使用Common列
下的这些IO模型
核心类,Common类
也会根据操作系统的不同自动选择JDK
在对应平台下的IO多路复用技术
的实现。
而Netty自身也根据操作系统的不同提供了自己对IO多路复用技术
的实现,比JDK
的实现性能更优。比如:
JDK
的 NIO 默认
实现是水平触发
,Netty 是边缘触发(默认)
和水平触发可切换。。
Netty 实现的垃圾回收更少、性能更好。
我们编写Netty服务端程序的时候也可以根据操作系统的不同,采用Netty自身的实现来进一步优化程序。做法也很简单,直接将上图中红框里的实现类替换成Netty的自身实现类即可完成切换。
经过以上对Netty服务端代码编写模板以及IO模型
相关核心类的简单介绍,我们对Netty的创建流程有了一个简单粗略的总体认识,下面我们来深入剖析下创建流程过程中的每一个步骤以及这个过程中涉及到的核心类实现。
以下源码解析部分我们均采用Common列
下NIO
相关的实现进行解析。
在Netty服务端程序编写模板的开始,我们首先会创建两个Reactor线程组:
netty中的reactor.png
一个是主Reactor线程组bossGroup
用于监听客户端连接,创建客户端连接NioSocketChannel
,并将创建好的客户端连接NioSocketChannel
注册到从Reactor线程组中一个固定的Reactor
上。
一个是从Reactor线程组workerGroup
,workerGroup
中的Reactor
负责监听绑定在其上的客户端连接NioSocketChannel
上的IO就绪事件
,并处理IO就绪事件
,执行异步任务
。
- //创建主从Reactor线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor线程组的实现类为NioEventLoopGroup
,在创建bossGroup
和workerGroup
的时候用到了NioEventLoopGroup
的两个构造函数:
带nThreads
参数的构造函数public NioEventLoopGroup(int nThreads)
。
不带nThreads
参数的默认
构造函数public NioEventLoopGroup()
- public class NioEventLoopGroup extends MultithreadEventLoopGroup {
-
- /**
- * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
- * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
- */
- public NioEventLoopGroup() {
- this(0);
- }
-
- /**
- * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
- * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
- */
- public NioEventLoopGroup(int nThreads) {
- this(nThreads, (Executor) null);
- }
-
- ......................省略...........................
- }
nThreads
参数表示当前要创建的Reactor线程组
内包含多少个Reactor线程
。不指定nThreads
参数的话采用默认的Reactor线程
个数,用0
表示。
最终会调用到构造函数
- public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
- final SelectStrategyFactory selectStrategyFactory) {
- super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
- }
下面简单介绍下构造函数中这几个参数的作用,后面我们在讲解本文主线的过程中还会提及这几个参数,到时在详细介绍,这里只是让大家有个初步印象,不必做过多的纠缠。
Executor executor:
负责启动Reactor线程
进而Reactor才可以开始工作。
Reactor线程组
NioEventLoopGroup
负责创建Reactor线程
,在创建的时候会将executor
传入。
RejectedExecutionHandler:
当向Reactor
添加异步任务添加失败时,采用的拒绝策略。Reactor的任务不只是监听IO活跃事件和IO任务的处理,还包括对异步任务的处理。这里大家只需有个这样的概念,后面笔者会专门详细介绍。
SelectorProvider selectorProvider:
Reactor中的IO模型为IO多路复用模型
,对应于JDK NIO中的实现为java.nio.channels.Selector
(就是我们上篇文章中提到的select,poll,epoll
),每个Reator中都包含一个Selector
,用于轮询
注册在该Reactor上的所有Channel
上的IO事件
。SelectorProvider
就是用来创建Selector
的。
SelectStrategyFactory selectStrategyFactory:
Reactor最重要的事情就是轮询
注册其上的Channel
上的IO就绪事件
,这里的SelectStrategyFactory
用于指定轮询策略
,默认为DefaultSelectStrategyFactory.INSTANCE
。
最终会将这些参数交给NioEventLoopGroup
的父类构造器,下面我们来看下NioEventLoopGroup类
的继承结构:
image.png
NioEventLoopGroup类
的继承结构乍一看比较复杂,大家不要慌,笔者会随着主线的深入慢慢地介绍这些父类接口,我们现在重点关注Mutithread
前缀的类。
我们知道NioEventLoopGroup
是Netty中的Reactor线程组
的实现,既然是线程组那么肯定是负责管理和创建多个Reactor线程的
,所以Mutithread
前缀的类定义的行为自然是对Reactor线程组
内多个Reactor线程
的创建和管理工作。
- public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
-
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
- //默认Reactor个数
- private static final int DEFAULT_EVENT_LOOP_THREADS;
-
- static {
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
- "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
-
- if (logger.isDebugEnabled()) {
- logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
- }
- }
-
- /**
- * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
- */
- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- }
-
- ...................省略.....................
- }
MultithreadEventLoopGroup类
主要的功能就是用来确定Reactor线程组
内Reactor
的个数。
默认的Reactor
的个数存放于字段DEFAULT_EVENT_LOOP_THREADS
中。
从static {}
静态代码块中我们可以看出默认Reactor
的个数的获取逻辑:
可以通过系统变量 -D io.netty.eventLoopThreads"
指定。
如果不指定,那么默认的就是NettyRuntime.availableProcessors() * 2
当nThread
参数设置为0
采用默认设置时,Reactor线程组
内的Reactor
个数则设置为DEFAULT_EVENT_LOOP_THREADS
。
MultithreadEventExecutorGroup
这里就是本小节的核心,主要用来定义创建和管理Reactor
的行为。
- public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
-
- //Reactor线程组中的Reactor集合
- private final EventExecutor[] children;
- private final Set<EventExecutor> readonlyChildren;
- //从Reactor group中选择一个特定的Reactor的选择策略 用于channel注册绑定到一个固定的Reactor上
- private final EventExecutorChooserFactory.EventExecutorChooser chooser;
-
- /**
- * Create a new instance.
- *
- * @param nThreads the number of threads that will be used by this instance.
- * @param executor the Executor to use, or {@code null} if the default should be used.
- * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
- */
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
- this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
- }
-
- ............................省略................................
- }
首先介绍一个新的构造器参数EventExecutorChooserFactory chooserFactory
。当客户端连接完成三次握手后,Main Reactor
会创建客户端连接NioSocketChannel
,并将其绑定到Sub Reactor Group
中的一个固定Reactor
,那么具体要绑定到哪个具体的Sub Reactor
上呢?这个绑定策略就是由chooserFactory
来创建的。默认为DefaultEventExecutorChooserFactory
。
下面就是本小节的主题Reactor线程组
的创建过程:
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
- EventExecutorChooserFactory chooserFactory, Object... args) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- }
-
- if (executor == null) {
- //用于创建Reactor线程
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
-
- children = new EventExecutor[nThreads];
- //循环创建reaactor group中的Reactor
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- //创建reactor
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- ................省略................
- }
- }
- }
- //创建channel到Reactor的绑定策略
- chooser = chooserFactory.newChooser(children);
-
- ................省略................
-
- Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
- Collections.addAll(childrenSet, children);
- readonlyChildren = Collections.unmodifiableSet(childrenSet);
- }
在Netty Reactor Group中的单个Reactor
的IO线程模型
为上篇文章提到的单Reactor单线程模型
,一个Reactor线程
负责轮询
注册其上的所有Channel
中的IO就绪事件
,处理IO事件,执行Netty中的异步任务等工作。正是这个Reactor线程
驱动着整个Netty的运转,可谓是Netty的核心引擎。
单Reactor单线程模型.png
而这里的executor
就是负责启动Reactor线程
的,从创建源码中我们可以看到executor
的类型为ThreadPerTaskExecutor
。
- public final class ThreadPerTaskExecutor implements Executor {
- private final ThreadFactory threadFactory;
-
- public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
- this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
- }
-
- @Override
- public void execute(Runnable command) {
- threadFactory.newThread(command).start();
- }
- }
我们看到ThreadPerTaskExecutor
做的事情很简单,从它的命名前缀ThreadPerTask
我们就可以猜出它的工作方式,就是来一个任务就创建一个线程执行。而创建的这个线程正是netty的核心引擎Reactor线程。
在Reactor线程
启动的时候,Netty会将Reactor线程
要做的事情封装成Runnable
,丢给exexutor
启动。
而Reactor线程
的核心就是一个死循环
不停的轮询
IO就绪事件,处理IO事件,执行异步任务。一刻也不停歇,堪称996典范
。
这里向大家先卖个关子,"Reactor线程是何时启动的呢??"
Reactor线程组NioEventLoopGroup
包含多个Reactor
,存放于private final EventExecutor[] children
数组中。
所以下面的事情就是创建nThread
个Reactor
,并存放于EventExecutor[] children
字段中,
我们来看下用于创建Reactor
的newChild(executor, args)
方法:
newChild
方法是MultithreadEventExecutorGroup
中的一个抽象方法,提供给具体子类实现。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
这里我们解析的是NioEventLoopGroup
,我们来看下newChild
在该类中的实现:
- public class NioEventLoopGroup extends MultithreadEventLoopGroup {
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
- return new NioEventLoop(this, executor, (SelectorProvider) args[0],
- ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
- }
- }
前边提到的众多构造器参数,这里会通过可变参数Object... args
传入到Reactor类NioEventLoop
的构造器中。
这里介绍下新的参数EventLoopTaskQueueFactory queueFactory
,前边提到Netty中的Reactor
主要工作是轮询
注册其上的所有Channel
上的IO就绪事件
,处理IO就绪事件
。除了这些主要的工作外,Netty为了极致的压榨Reactor
的性能,还会让它做一些异步任务的执行工作。既然要执行异步任务,那么Reactor
中就需要一个队列
来保存任务。
这里的EventLoopTaskQueueFactory
就是用来创建这样的一个队列来保存Reactor
中待执行的异步任务。
可以把Reactor
理解成为一个单线程的线程池
,类似
于JDK
中的SingleThreadExecutor
,仅用一个线程来执行轮询IO就绪事件
,处理IO就绪事件
,执行异步任务
。同时待执行的异步任务保存在Reactor
里的taskQueue
中。
- public final class NioEventLoop extends SingleThreadEventLoop {
- //用于创建JDK NIO Selector,ServerSocketChannel
- private final SelectorProvider provider;
- //Selector轮询策略 决定什么时候轮询,什么时候处理IO事件,什么时候执行异步任务
- private final SelectStrategy selectStrategy;
- /**
- * The NIO {@link Selector}.
- */
- private Selector selector;
- private Selector unwrappedSelector;
-
- NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
- SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
- EventLoopTaskQueueFactory queueFactory) {
- super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
- rejectedExecutionHandler);
- this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
- this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
- final SelectorTuple selectorTuple = openSelector();
- this.selector = selectorTuple.selector;
- this.unwrappedSelector = selectorTuple.unwrappedSelector;
- }
- }
这里就正式开始了Reactor
的创建过程,我们知道Reactor
的核心是采用的IO多路复用模型
来对客户端连接上的IO事件
进行监听
,所以最重要的事情是创建Selector
(JDK NIO 中IO多路复用技术的实现
)。
可以把
Selector
理解为我们上篇文章介绍的Select,poll,epoll
,它是JDK NIO
对操作系统内核提供的这些IO多路复用技术
的封装。
openSelector
是NioEventLoop类
中用于创建IO多路复用
的Selector
,并对创建出来的JDK NIO
原生的Selector
进行性能优化。
首先会通过SelectorProvider#openSelector
创建JDK NIO原生的Selector
。
- private SelectorTuple openSelector() {
- final Selector unwrappedSelector;
- try {
- //通过JDK NIO SelectorProvider创建Selector
- unwrappedSelector = provider.openSelector();
- } catch (IOException e) {
- throw new ChannelException("failed to open a new selector", e);
- }
-
- ..................省略.............
- }
SelectorProvider
会根据操作系统的不同选择JDK在不同操作系统版本下的对应Selector
的实现。Linux下会选择Epoll
,Mac下会选择Kqueue
。
下面我们就来看下SelectorProvider
是如何做到自动适配不同操作系统下IO多路复用
实现的
- public NioEventLoopGroup(ThreadFactory threadFactory) {
- this(0, threadFactory, SelectorProvider.provider());
- }
SelectorProvider
是在前面介绍的NioEventLoopGroup类
构造函数中通过调用SelectorProvider.provider()
被加载,并通过NioEventLoopGroup#newChild
方法中的可变长参数Object... args
传递到NioEventLoop
中的private final SelectorProvider provider
字段中。
SelectorProvider的加载过程:
- public abstract class SelectorProvider {
-
- public static SelectorProvider provider() {
- synchronized (lock) {
- if (provider != null)
- return provider;
- return AccessController.doPrivileged(
- new PrivilegedAction
() { - public SelectorProvider run() {
- if (loadProviderFromProperty())
- return provider;
- if (loadProviderAsService())
- return provider;
- provider = sun.nio.ch.DefaultSelectorProvider.create();
- return provider;
- }
- });
- }
- }
- }
从SelectorProvider
加载源码中我们可以看出,SelectorProvider
的加载方式有三种,优先级如下:
通过系统变量-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定义实现类全限定名
。通过应用程序类加载器(Application Classloader)
加载。
- private static boolean loadProviderFromProperty() {
- String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
- if (cn == null)
- return false;
- try {
- Class<?> c = Class.forName(cn, true,
- ClassLoader.getSystemClassLoader());
- provider = (SelectorProvider)c.newInstance();
- return true;
- }
- .................省略.............
- }
通过SPI
方式加载。在工程目录META-INF/services
下定义名为java.nio.channels.spi.SelectorProvider
的SPI文件
,文件中第一个定义的SelectorProvider
实现类全限定名就会被加载。
- private static boolean loadProviderAsService() {
-
- ServiceLoader<SelectorProvider> sl =
- ServiceLoader.load(SelectorProvider.class,
- ClassLoader.getSystemClassLoader());
- Iterator<SelectorProvider> i = sl.iterator();
- for (;;) {
- try {
- if (!i.hasNext())
- return false;
- provider = i.next();
- return true;
- } catch (ServiceConfigurationError sce) {
- if (sce.getCause() instanceof SecurityException) {
- // Ignore the security exception, try the next provider
- continue;
- }
- throw sce;
- }
- }
- }
如果以上两种方式均未被定义,那么就采用SelectorProvider
系统默认实现sun.nio.ch.DefaultSelectorProvider
。笔者当前使用的操作系统是MacOS
,从源码中我们可以看到自动适配了KQueue
实现。
- public class DefaultSelectorProvider {
- private DefaultSelectorProvider() {
- }
-
- public static SelectorProvider create() {
- return new KQueueSelectorProvider();
- }
- }
不同操作系统中JDK对于
DefaultSelectorProvider
会有所不同,Linux内核版本2.6以上对应的Epoll
,Linux内核版本2.6以下对应的Poll
,MacOS对应的是KQueue
。
下面我们接着回到io.netty.channel.nio.NioEventLoop#openSelector
的主线上来。
文章篇幅有限 下文讲解