目录
找到[netty-example]模块的ceho包,查看简单的使用案例。

- public final class EchoClient {
-
- static final String HOST = System.getProperty("host", "127.0.0.1");
- static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
- static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
-
- public static void main(String[] args) throws Exception {
-
- // Configure the client.
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer
() { - @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- //p.addLast(new LoggingHandler(LogLevel.INFO));
- p.addLast(new EchoClientHandler());
- }
- });
-
- // Start the client.
- ChannelFuture f = b.connect(HOST, PORT).sync();
-
- // Wait until the connection is closed.
- f.channel().closeFuture().sync();
- } finally {
- // Shut down the event loop to terminate all threads.
- group.shutdownGracefully();
- }
- }
- }
上边代码很简单,无非就是新建了个bootstrap,对其进行配置,然后启动客户端连接服务端,最后等待连接关闭。不得不说,netty代码封装的很好,使我们初学者可以简单快速的上手,但是代码封装的越好。意味着内部代码越复杂。
下边,我们来更详细的看下配置bootstrap的流程
在 Netty 中, Channel 是一个 Socket 的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例.
这里,我们需要先知道NioSocketChannel的作用:异步的客户端 TCP Socket 连接。除此之外,还有以下类型:
从上边代码中我们看到了是channel方法中配置的:
…… .channel(NioSocketChannel.class)
我们点进去查看
- public B channel(Class extends C> channelClass) {
- return channelFactory(new ReflectiveChannelFactory
( - ObjectUtil.checkNotNull(channelClass, "channelClass")
- ));
- }
发现就是简单的指定了AbstractBootstrap抽象类中的channelFactory属性&&指定了ReflectiveChannelFactory中的constructor属性。同时我们发现实例化ReflectiveChannelFactory中的constructor,也就是实例化public io.netty.channel.socket.nio.NioSocketChannel()是在ReflectiveChannelFactory类中重写的newChannel方法中。那么具体是在哪里调用的呢?这里先挖个坑Ⅰ。
进入NioSocketChannel类,找到无参构造器,调用有参构造器
- public NioSocketChannel(SelectorProvider provider) {
- this(newSocket(provider));
- }
注意这里的newSocket,是用来打开一个新的 Java NIO SocketChannel。
构造方法继续往下看点点点……来到AbstractChannel
- protected AbstractChannel(Channel parent) {
- this.parent = parent;
- id = newId();
- unsafe = newUnsafe();
- pipeline = newChannelPipeline();
- }
- protected AbstractNioUnsafe newUnsafe() {
- return new NioSocketChannelUnsafe();
- }
我们直接看NioSocketChannelUnsafe的继承和实现,最后可以看出来实现了Unsafe接口,我们来看下这个接口

一看便知,这些操作都是和 Java 底层的 Socket 相关的操作。
首先我们要知道
Each channel has its own pipeline and it is created automatically when a new channel is created在实例化一个 Channel 时, 必然伴随着实例化一个 ChannelPipeline.
接下来我们来看一下初始化pipeline时都做了那些事情吧,对着newChannelPipeline()往下一直点点到DefaultChannelPipeline的有参构造。
- protected DefaultChannelPipeline(Channel channel) {
- this.channel = ObjectUtil.checkNotNull(channel, "channel");
- succeededFuture = new SucceededChannelFuture(channel, null);
- voidPromise = new VoidChannelPromise(channel, true);
-
- tail = new TailContext(this);
- head = new HeadContext(this);
-
- head.next = tail;
- tail.prev = head;
- }
HeadContext的继承结构图,不难发现有ChannelOutboundHandler和ChannelInboundHandler

TailContext的继承结构图,只有ChannelInboundHandler

他们又都继承了AbstractChannelHandlerContext,这就是DefaultChannelPipeline中维护的双向链表,tail为尾部,head为头部。这个链表是 Netty 实现 Pipeline 机制的关键
NioSocketChannel初始化时
readInterestOp变为SelectionKey.OP_READ
点入new NioEventLoopGroup()默认this(0)参数为0。继续走到MultithreadEventLoopGroup的构造方法
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
可以看出来,如果new NioEventLoopGroup(?)不填参数,默认就是以下规则。
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
- "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
NettyRuntime.availableProcessors() * 2 = 处理器核心数 * 2
最后走到MultithreadEventExecutorGroup的MultithreadEventExecutorGroup方法。
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
- EventExecutorChooserFactory chooserFactory, Object... args) {
- checkPositive(nThreads, "nThreads");
- //如果没有自定义执行器(该执行器最终被赋值给EventExecutor的成员变量),则使用ThreadPerTaskExecutor
- if (executor == null) {
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
- //实例化children
- children = new EventExecutor[nThreads];
- //for循环将实例化children中的每一个元素
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- //通过子类中的newChild()来实现
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- // TODO: Think about if this is a good exception type
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- if (!success) {
- for (int j = 0; j < i; j ++) {
- children[j].shutdownGracefully();
- }
-
- for (int j = 0; j < i; j ++) {
- EventExecutor e = children[j];
- try {
- while (!e.isTerminated()) {
- e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
- }
- } catch (InterruptedException interrupted) {
- // Let the caller handle the interruption.
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- }
- //实例化事件轮询器,即上述的默认的执行器选择工厂DefaultEventExecutorChooserFactory.INSTANCE
- chooser = chooserFactory.newChooser(children);
- //定义异步事件通知,该通知将被添加到事件执行器EventExecutor上,
- //其逻辑也是简单的当children的最后一个元素被成功初始化后设置当前Group的实例化结果
- final FutureListener
- @Override
- public void operationComplete(Future
future) throws Exception { - if (terminatedChildren.incrementAndGet() == children.length) {
- terminationFuture.setSuccess(null);
- }
- }
- };
- //将上述通知添加到children中的每一个元素上
- for (EventExecutor e: children) {
- e.terminationFuture().addListener(terminationListener);
- }
- //构建一个不可更改的readonlyChildren用于遍历。
- Set
childrenSet = new LinkedHashSet(children.length); - Collections.addAll(childrenSet, children);
- readonlyChildren = Collections.unmodifiableSet(childrenSet);
- }
MultithreadEventExecutorGroup 内部维护了一个 EventExecutor 数组, Netty 的 EventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上. 每当 Netty 需要一个 EventLoop 时, 会调用 next() 方法获取一个可用的 EventLoop.
上边讲过了初始化的过程,如果你认真看了就知道上边留了个坑Ⅰ。channel在Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister这里边调用channelFactory.newChannel()完成初始化。
initAndRegister代码简化后
- final ChannelFuture initAndRegister() {
- // 去掉非关键代码
- final Channel channel = channelFactory().newChannel();
- init(channel);
- ChannelFuture regFuture = config().group().register(channel);
- return regFuture;
- }
从代码可以看出来初始化后使用register对channel进行了注册,以下是注册主流程
总的来说, Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联, 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. 通过这两步, 就完成了 Netty Channel 的注册过程。
Netty 的一个强大和灵活之处就是基于 Pipeline 的自定义 handler 机制。
- ...
- .handler(new ChannelInitializer
() { - @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- if (sslCtx != null) {
- p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
- }
- //p.addLast(new LoggingHandler(LogLevel.INFO));
- p.addLast(new EchoClientHandler());
- }
- });
handler方法主要是指定了一个handler属性,所以不再细究。我们主要看handler方法的入参ChannelInitializer类。
ChannelInitializer是抽象类,并且有个抽象方法initChannel,也就是我们需要实现的方法。那么initChannel在哪里调用呢?答案是initChannel(ChannelHandlerContext ctx)中。调用链
Bootstrap.connect->AbstractBootstrap.initAndRegister->AbstractChannel.register0 ->DefaultChannelPipeline.invokeHandlerAddedIfNeeded ->DefaultChannelPipeline.callHandlerAddedForAllHandlers ->DefaultChannelPipeline.callHandlerAdded0->ChannelInitializer.handlerAdded ->ChannelInitializer.initChannel(ChannelHandlerContext ctx)
- // 简化
- try {
- initChannel((C) ctx.channel());
- }finally {
- ChannelPipeline pipeline = ctx.pipeline();
- if (pipeline.context(this) != null) {
- pipeline.remove(this);
- }
- }
起始调用链肯定是从
ChannelFuture f = b.connect(HOST, PORT).sync();
然后点点点到Bootstrap的doConnect方法
- private static void doConnect(
- final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
-
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- final Channel channel = connectPromise.channel();
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (localAddress == null) {
- channel.connect(remoteAddress, connectPromise);
- } else {
- channel.connect(remoteAddress, localAddress, connectPromise);
- }
- connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- }
- });
- }
我们指定的channel是NioSocketChannel(没有实现connect方法),所以调用AbstractChannel
的connect方法。
- public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
- return pipeline.connect(remoteAddress, promise);
- }
pipeline是DefaultChannelPipeline,在上边pipeline初始化中讲过。点进去
- public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
- return tail.connect(remoteAddress, promise);
- }
tail是什么,tail是一个(请转至上边查看继承图)。然后走到了AbstractChannelHandlerContext的connect方法
- public ChannelFuture connect(
- final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
- // 精简后
- final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
- EventExecutor executor = next.executor();
- next.invokeConnect(remoteAddress, localAddress, promise);
- return promise;
- }
它首先拿到了一个next,next是什么。 是从 DefaultChannelPipeline 内的双向链表的 tail 开始, 不断根据mask向前寻找第一个是 outbound 的 AbstractChannelHandlerContext。更直观一点

紧接着调用next.invokeConnect但是HeadContext中没实现invokeConnect,所以仍然调用AbstractChannelHandlerContext.invokeConnect方法最后调用HeadContext的connect方法
- public void connect(
- ChannelHandlerContext ctx,
- SocketAddress remoteAddress, SocketAddress localAddress,
- ChannelPromise promise) {
- unsafe.connect(remoteAddress, localAddress, promise);
- }
unsafe我们已经很熟悉了吧,在HeadContext构造方法中初始化了unsafe,不懂向上看
- HeadContext(DefaultChannelPipeline pipeline) {
- super(pipeline, null, HEAD_NAME, HeadContext.class);
- unsafe = pipeline.channel().unsafe();
- setAddComplete();
- }
然后就来到了NioSocketChannel的doConnect方法
- protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
- // 代码简化
- boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
-
- }
进入SocketUtils.connect后就看到了如何连接的。

如果耐心看下来会有必然会有收获。如果哪里不正确,请大佬们指正
参考自:yongshun/learn_netty_source_code: Netty 源码分析教程 (github.com)
但是版本是4.0.33.Final