大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?
大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。
在上篇文章 《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》 中我们详细介绍了Netty服务端核心引擎组件 主从Reactor组模型 NioEventLoopGroup
以及 Reactor模型 NioEventLoop
的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:
现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。
我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组: bossGroup
, workerGroup
后,接下来就开始配置Netty服务端的启动辅助类 ServerBootstrap
了。
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. //创建主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项 .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler .childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. 绑定端口启动服务,开始监听accept事件 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在上篇文章中我们对代码模板中涉及到 ServerBootstrap
的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。
ServerBootstrap类
其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:
Netty的核心引擎组件 主从Reactor线程组: bossGroup,workerGroup
。通过 ServerBootstrap#group方法
配置。
Netty服务端使用到的Channel类型: NioServerSocketChannel
,通过 ServerBootstrap#channel方法
配置。
以及配置 NioServerSocketChannel
时用到的 SocketOption
。 SocketOption
用于设置底层JDK NIO Socket的一些选项。通过 ServerBootstrap#option方法
进行配置。
主ReactorGroup中的MainReactor管理的Channel类型为 NioServerSocketChannel
,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化 NioSocketChannel
,然后采用 round-robin
轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端 NioSocketChannel
进行绑定。
从ReactorGroup中的SubReactor管理的Channel类型为 NioSocketChannel
,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有 NioSocketChannel
上的IO事件。
NioServerSocketChannel
和客户端 NioSocketChannel
对应 pipeline
中指定的 ChannelHandler
。用于后续Channel向Reactor注册成功之后,初始化Channel里的pipeline。不管是服务端用到的 NioServerSocketChannel
还是客户端用到的 NioSocketChannel
,每个 Channel实例
都会有一个 Pipeline
, Pipeline
中有多个 ChannelHandler
用于编排处理对应 Channel
上感兴趣的 IO事件
。
ServerBootstrap
结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下 ServerBootstrap
的源码结构:
ServerBootstrap
的继承结构比较简单,继承层次的职责分工也比较明确。
ServerBootstrap
主要负责对 主从Reactor线程组
相关的配置进行管理,其中带 child前缀的配置方法
是对 从Reactor线程组
的相关配置管理。 从Reactor线程组
中的 Sub Reactor
负责管理的客户端 NioSocketChannel
相关配置存储在 ServerBootstrap
结构中。
父类 AbstractBootstrap
则是主要负责对 主Reactor线程组
相关的配置进行管理,以及 主Reactor线程组
中的 Main Reactor
负责处理的服务端 ServerSocketChannel
相关的配置管理。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor
public class ServerBootstrap extends AbstractBootstrap{ //Main Reactor线程组 volatile EventLoopGroup group; //Sub Reactor线程组 private volatile EventLoopGroup childGroup; public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //父类管理主Reactor线程组 super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; } }
ServerBootstrap b = new ServerBootstrap(); b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap{ //用于创建ServerSocketChannel ReflectiveChannelFactory private volatile ChannelFactory extends C> channelFactory; public B channel(Class extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory ( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } @Deprecated public B channelFactory(ChannelFactory extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } }
在向 ServerBootstrap
配置服务端 ServerSocketChannel
的 channel
方法中,其实是创建了一个 ChannelFactory
工厂实例 ReflectiveChannelFactory
,在Netty服务端启动的过程中,会通过这个 ChannelFactory
去创建相应的 Channel
实例。
我们可以通过这个方法来配置netty的IO模型,下面为 ServerSocketChannel
在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
EventLoopGroup
Reactor线程组在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
我们只需要将 IO模型
的这些核心接口对应的实现类 前缀
改为对应 IO模型
的前缀,就可以轻松在Netty中完成对 IO模型
的切换。
public class ReflectiveChannelFactoryimplements ChannelFactory { //NioServerSocketChannelde 构造器 private final Constructor extends T> constructor; public ReflectiveChannelFactory(Class extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { //反射获取NioServerSocketChannel的构造器 this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override public T newChannel() { try { //创建NioServerSocketChannel实例 return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } }
从类的签名我们可以看出,这个工厂类是通过 泛型
加 反射
的方式来创建对应的 Channel
实例。
T extends Channel
表示的是要通过工厂类创建的 Channel类型
,这里我们初始化的是 NioServerSocketChannel
。ReflectiveChannelFactory
的构造器中通过 反射
的方式获取 NioServerSocketChannel
的构造器。newChannel
方法中通过构造器反射创建 NioServerSocketChannel
实例。注意这时只是配置阶段, NioServerSocketChannel
此时并未被创建。它是在启动的时候才会被创建出来。
ServerBootstrap b = new ServerBootstrap(); //设置被MainReactor管理的NioServerSocketChannel的Socket选项 b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { //serverSocketChannel中的ChannelOption配置 private final Map, Object> options = new LinkedHashMap , Object>(); public B option(ChannelOption option, T value) { ObjectUtil.checkNotNull(option, "option"); synchronized (options) { if (value == null) { options.remove(option); } else { options.put(option, value); } } return self(); } }
无论是服务端的 NioServerSocketChannel
还是客户端的 NioSocketChannel
它们的相关底层Socket选项 ChannelOption
配置全部存放于一个 Map
类型的数据结构中。
由于客户端 NioSocketChannel
是由 从Reactor线程组
中的 Sub Reactor
来负责处理,所以涉及到客户端 NioSocketChannel
所有的方法和配置全部是以 child
前缀开头。
ServerBootstrap b = new ServerBootstrap(); .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap{ //客户端SocketChannel对应的ChannelOption配置 private final Map , Object> childOptions = new LinkedHashMap , Object>(); public ServerBootstrap childOption(ChannelOption childOption, T value) { ObjectUtil.checkNotNull(childOption, "childOption"); synchronized (childOptions) { if (value == null) { childOptions.remove(childOption); } else { childOptions.put(childOption, value); } } return this; } }
相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。
public class ChannelOptionextends AbstractConstant > { ..................省略.............. public static final ChannelOption SO_BROADCAST = valueOf("SO_BROADCAST"); public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE"); public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF"); public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF"); public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR"); public static final ChannelOption SO_LINGER = valueOf("SO_LINGER"); public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG"); public static final ChannelOption SO_TIMEOUT = valueOf("SO_TIMEOUT"); ..................省略.............. }
//serverSocketChannel中pipeline里的handler(主要是acceptor) private volatile ChannelHandler handler; public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); }
向 NioServerSocketChannel
中的 Pipeline
添加 ChannelHandler
分为两种方式:
显式添加:
显式添加的方式是由用户在main线程中通过 ServerBootstrap#handler
的方式添加。如果需要添加多个 ChannelHandler
,则可以通过 ChannelInitializer
向 pipeline
中进行添加。关于 ChannelInitializer
后面笔者会有详细介绍,这里大家只需要知道 ChannelInitializer
是一种特殊的 ChannelHandler
,用于初始化 pipeline
。适用于向pipeline中添加多个ChannelHandler的场景。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .handler(new ChannelInitializer() { @Override protected void initChannel(NioServerSocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(channelhandler1) .addLast(channelHandler2) ...... .addLast(channelHandler3); } })
隐式添加:
隐式添加主要添加的就是 主ReactorGroup
的核心组件也就是下图中的 acceptor
,Netty中的实现为 ServerBootstrapAcceptor
,本质上也是一种 ChannelHandler
,主要负责在客户端连接建立好后,初始化客户端 NioSocketChannel
,在 从Reactor线程组中
选取一个 Sub Reactor
,将客户端 NioSocketChannel
注册到 Sub Reactor
中的 selector
上。隐式添加 ServerBootstrapAcceptor
是由Netty框架在启动的时候负责添加,用户无需关心。
在本例中, NioServerSocketChannel
的 PipeLine
中只有两个 ChannelHandler
,一个由用户在外部显式添加的 LoggingHandler
,另一个是由Netty框架隐式添加的 ServerBootstrapAcceptor
。
其实我们在实际项目使用的过程中,不会向netty服务端 NioServerSocketChannel
添加额外的ChannelHandler, NioServerSocketChannel
只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个 LoggingHandler
只是为了向大家展示 ServerBootstrap
的配置方法。
final EchoServerHandler serverHandler = new EchoServerHandler(); serverBootstrap.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline