Netty通用代码:
创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:

Netty业务代码:
ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:

上述代码中,netty架构图如下所示:

从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。
如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。
注意:bossGroup只需要指定创建1个NioEventLoop,因为服务端只有一个ServerSocketChannel对象,根本没办法注册到多个selector中。没有任何资料能够实际展示bossGroup中的多个NioEventLoop。
SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
}
}
}
EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:
public class EPollSelectorProvider extends SelectorProviderImpl {
public EPollSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。
有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
//创建线程执行器,
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//省略
}
//创建线程池
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
}
线程池的初始化操作如下:
public class DefaultThreadFactory implements ThreadFactory {
public DefaultThreadFactory(Class> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
ObjectUtil.checkNotNull(poolName, "poolName");
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
//使用统一的前缀作为线程名
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
//可以调用newThread直接创建一个线程
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
}
定义了线程名前缀:

后续创建线程时,使用线程名做前缀:

ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
//创建线程池
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//省略
}
}
chooser = chooserFactory.newChooser(children);
//省略
}
//创建NioEventLoop的方法由NioEventLoopGroup类实现
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
}
NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//创建EPollSelectorImpl对象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//省略
return new SelectorTuple(unwrappedSelector);
}
}
NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。
ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。
指定NioServerSocketChannel作为netty的SeverSocketChannel实现类:
serverBootstrap.channel(NioServerSocketChannel.class);
NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
}
NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected abstract AbstractUnsafe newUnsafe();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
handler表示socket发生事件时,应该执行的操作。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler());
}
})
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler());
}
});
ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
}
ServerBootstrap保存SocketChannel对应的childHander:
public class ServerBootstrap extends AbstractBootstrap{ public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; } }
通过ServerBootstrap#bind方法启动netty服务端:
ChannelFuture future = serverBootstrap.bind(8080).sync();
调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//省略
}
//创建ServerSocketChannel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//省略
}
//省略
}
从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。
在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:
public class ServerBootstrap extends AbstractBootstrap{ void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry , Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry , Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer () { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } }
pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。
当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:

在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。
如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallb