Netty通用代码:
创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:
Netty业务代码:
ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:
上述代码中,netty架构图如下所示:
从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。
如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。
注意:bossGroup只需要指定创建1个NioEventLoop,因为服务端只有一个ServerSocketChannel对象,根本没办法注册到多个selector中。没有任何资料能够实际展示bossGroup中的多个NioEventLoop。
SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:
public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:
public abstract class SelectorProvider { public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } }
不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:
public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { return new KQueueSelectorProvider(); } }
linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:
public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name")); if (var0.equals("SunOS")) { return createProvider("sun.nio.ch.DevPollSelectorProvider"); } else { return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider()); } } }
EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:
public class EPollSelectorProvider extends SelectorProviderImpl { public EPollSelectorProvider() { } public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } }
EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。
有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //创建线程执行器, if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //省略 } //创建线程池 protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); } }
线程池的初始化操作如下:
public class DefaultThreadFactory implements ThreadFactory { public DefaultThreadFactory(Class> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { ObjectUtil.checkNotNull(poolName, "poolName"); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } //使用统一的前缀作为线程名 prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } //可以调用newThread直接创建一个线程 public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } }
定义了线程名前缀:
后续创建线程时,使用线程名做前缀:
ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //创建线程池 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; //创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //省略 } } chooser = chooserFactory.newChooser(children); //省略 } //创建NioEventLoop的方法由NioEventLoopGroup类实现 protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception; }
NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:
public class NioEventLoopGroup extends MultithreadEventLoopGroup { protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); } }
NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。
public final class NioEventLoop extends SingleThreadEventLoop { NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; } private SelectorTuple openSelector() { final Selector unwrappedSelector; try { //创建EPollSelectorImpl对象 unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //省略 return new SelectorTuple(unwrappedSelector); } }
NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。
ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。
指定NioServerSocketChannel作为netty的SeverSocketChannel实现类:
serverBootstrap.channel(NioServerSocketChannel.class);
NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { //DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { try { //通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。 return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } }
NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel { protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); } } }
NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected abstract AbstractUnsafe newUnsafe(); protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } }
handler表示socket发生事件时,应该执行的操作。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) .handler(new ChannelInitializer() { @Override protected void initChannel(ServerSocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } }) .childHandler(new ChannelInitializer () { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } });
ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); } }
ServerBootstrap保存SocketChannel对应的childHander:
public class ServerBootstrap extends AbstractBootstrap{ public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; } }
通过ServerBootstrap#bind方法启动netty服务端:
ChannelFuture future = serverBootstrap.bind(8080).sync();
调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //省略 } //创建ServerSocketChannel final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { //省略 } //省略 }
从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。
在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:
public class ServerBootstrap extends AbstractBootstrap{ void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry , Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry , Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer () { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } }
pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。
当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:
在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。
如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:
public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallb