• Netty服务器启动源码剖析


    Netty服务器启动源码剖析

    1、Netty服务器启动源码剖析

    1.1、执行new NioEventLoopGroup()时发生了什么

    本次分析创建workerGroup的过程(创建 bossGroup 的过程同理):

    /**
    * EventLoopGroup 是一个线程组,其中的每一个线程都在循环执行着三件事情:
    * select:轮询注册在其中的 Selector 上的 Channel 的 IO 事件
    * processSelectedKeys:在对应的 Channel 上处理 IO 事件
    * runAllTasks:再去以此循环处理任务队列中的其他任务
    */
    EventLoopGroup workGroup = new NioEventLoopGroup();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1.1.1、NioEventLoopGroup 成分
    • Debug查看 workerGroup 的“成分”,可以看到它包含 8(cpu核数*2) 个 NioEventLoop,每个 NioEventLoop 里面有选择器、任务队列、执行器等等:

      在这里插入图片描述

    • NioEventLoop继承关系图:

      在这里插入图片描述

    1.1.2、追踪 new NioEventLoopGroup()
    • 追踪 new NioEventLoopGroup() 的底层调用:

      • (1)new NioEventLoopGroup()

        在这里插入图片描述

        • 红色框圈住的构造方法的源码为:

              /**
               * Create a new instance.
               *
               * @param nThreads          the number of threads that will be used by this instance.
               * @param executor          the Executor to use, or {@code null} if the default should be used.
               * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
               * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
               */
              protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                      EventExecutorChooserFactory chooserFactory, Object... args) {
                  if (nThreads <= 0) {
                      throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
                  }
          
                  // 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数
                  if (executor == null) {
                      executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
                      // ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command
                      
                      //public final class ThreadPerTaskExecutor implements Executor {
                      //    private final ThreadFactory threadFactory;
                      //
                      //    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
                      //        if (threadFactory == null) {
                      //            throw new NullPointerException("threadFactory");
                      //        }
                      //        this.threadFactory = threadFactory;
                      //    }
                      //
                      //    @Override
                      //    public void execute(Runnable command) {
                      //        threadFactory.newThread(command).start();
                      //    }
                      //}
                  }
          
                  // 这里定义了一个容量为 nThreads 的 EventExecutor 的数组
                  children = new EventExecutor[nThreads];
          
                  for (int i = 0; i < nThreads; i ++) {
                      boolean success = false;
                      try {
                          // 往 EventExecutor 数组中添加元素
                          children[i] = newChild(executor, args);
                          success = true;
                      } catch (Exception e) {
                          // TODO: Think about if this is a good exception type
                          throw new IllegalStateException("failed to create a child event loop", e);
                      } finally {
                          if (!success) {
                              // 添加元素失败,则 shutdown 每一个 EventExecutor
                              for (int j = 0; j < i; j ++) {
                                  children[j].shutdownGracefully();
                              }
          
                              for (int j = 0; j < i; j ++) {
                                  EventExecutor e = children[j];
                                  try {
                                      while (!e.isTerminated()) {
                                          e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                      }
                                  } catch (InterruptedException interrupted) {
                                      // Let the caller handle the interruption.
                                      Thread.currentThread().interrupt();
                                      break;
                                  }
                              }
                          }
                      }
                  }
          
                  // chooser 的作用是为了实现 next()方法,即从 group 中挑选一个 NioEventLoop 来处理连接上 IO 事件的方法
                  chooser = chooserFactory.newChooser(children);
          
                  final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                      // EventExecutor 的终止事件回调方法
                      @Override
                      public void operationComplete(Future<Object> future) throws Exception {
                          if (terminatedChildren.incrementAndGet() == children.length) {
                              // 通过本类中定义的 Promise 属性的.setSuccess()方法设置结果,所有的监听者可以拿到该结果
                              terminationFuture.setSuccess(null);
                          }
                      }
                  };
          
                  for (EventExecutor e: children) {
                      // 为每一个 EventExecutor 添加终止事件监听器
                      e.terminationFuture().addListener(terminationListener);
                  }
          
                  Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
                  Collections.addAll(childrenSet, children);
                  readonlyChildren = Collections.unmodifiableSet(childrenSet);
              }
          
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
          • 58
          • 59
          • 60
          • 61
          • 62
          • 63
          • 64
          • 65
          • 66
          • 67
          • 68
          • 69
          • 70
          • 71
          • 72
          • 73
          • 74
          • 75
          • 76
          • 77
          • 78
          • 79
          • 80
          • 81
          • 82
          • 83
          • 84
          • 85
          • 86
          • 87
          • 88
          • 89
          • 90
          • 91
          • 92
          • 93
          • 94
          • 95
      • (2):newChild(executor, args) 方法,主要关注 NioEventLoop 中的选择器、任务队列、执行器等成分是从哪来的。

        在这里插入图片描述

        这里的 newChild() 方法包含了构建每一个 NioEventLoop 的细节,可以看到,newChild()调用了 NioEventLoop 的构造函数来构建每一个 NioEventLoop 实例。

        • 执行器(executor)

          在这里插入图片描述

          • 调用 NioEventLoop 的构造函数的时候,传入的参数 parent 为上一层调用者,executor 为 ThreadPerTaskExecutor 的实例。上文的代码注释已经讲明了其来源和功能,如下:

            // 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
                // ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command
            
                //public final class ThreadPerTaskExecutor implements Executor {
                //    private final ThreadFactory threadFactory;
                //
                //    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
                //        if (threadFactory == null) {
                //            throw new NullPointerException("threadFactory");
                //        }
                //        this.threadFactory = threadFactory;
                //    }
                //
                //    @Override
                //    public void execute(Runnable command) {
                //        threadFactory.newThread(command).start();
                //    }
                //}
            }
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
        • 选择器(selector)

          • NioEventLoop 的构造方法中有一个 openSelector(),它完成了选择器(多路复用器)的初始化。其中 provider(源码追踪图1的step-3)、selectStrategy(源码追踪图1的step-4) 由上一层传入。

            在这里插入图片描述

            在这里插入图片描述

            在这里插入图片描述

          • openSelector() 源码分析

            private SelectorTuple openSelector() {
                final Selector unwrappedSelector;
                try {
                    // 通过往下追踪发现 provider.openSelector()最终调用了 WindowsSelectorImpl 类的构造方法构造出一个 Selector,因此 unwrappedSelector 是 WindowsSelectorImpl 的实例
                    unwrappedSelector = provider.openSelector();
                    //public class WindowsSelectorProvider extends SelectorProviderImpl {
                    //    public WindowsSelectorProvider() {
                    //    }
                    //
                    //    public AbstractSelector openSelector() throws IOException {
                    //        return new WindowsSelectorImpl(this);
                    //    }
                    //}
                } catch (IOException e) {
                    throw new ChannelException("failed to open a new selector", e);
                }
            
                // Netty 对 NIO 的 Selector 的 selectedKeys 进行了优化(默认设置),用户可以通过 io.netty.noKeySetOptimization 开关决定是否启用该优化项。
                // 常量 DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
                if (DISABLE_KEY_SET_OPTIMIZATION) {
                    // 若没有开启 selectedKeys 优化,直接返回
                    return new SelectorTuple(unwrappedSelector);
                    //    SelectorTuple(Selector unwrappedSelector) {
                    //        this.unwrappedSelector = unwrappedSelector;
                    //        this.selector = unwrappedSelector;
                    //    }
                }
            
                // 若开启 selectedKeys 优化,需要通过反射的方式从 Selector 实例中获取 selectedKeys 和 publicSelectedKeys,将上述两个成员变量置为可写,然后通过反射的方式使用 Netty 构造的 selectedKeys 包装类selectedKeySet 将原 JDK 的 selectedKeys 替换掉。
                Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    @Override
                    public Object run() {
                        try {
                            return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                        } catch (Throwable cause) {
                            return cause;
                        }
                    }
                });
            
                if (!(maybeSelectorImplClass instanceof Class) ||
                    // ensure the current selector implementation is what we can instrument.
                    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                    if (maybeSelectorImplClass instanceof Throwable) {
                        Throwable t = (Throwable) maybeSelectorImplClass;
                        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                    }
                    return new SelectorTuple(unwrappedSelector);
                }
            
                final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
                final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            
                Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    @Override
                    public Object run() {
                        try {
                            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            
                            if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                                // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                                // This allows us to also do this in Java9+ without any extra flags.
                                long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                                long publicSelectedKeysFieldOffset =
                                    PlatformDependent.objectFieldOffset(publicSelectedKeysField);
            
                                if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                                    PlatformDependent.putObject(
                                        unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                    PlatformDependent.putObject(
                                        unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                                    return null;
                                }
                                // We could not retrieve the offset, lets try reflection as last-resort.
                            }
            
                            Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                            if (cause != null) {
                                return cause;
                            }
                            cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                            if (cause != null) {
                                return cause;
                            }
            
                            selectedKeysField.set(unwrappedSelector, selectedKeySet);
                            publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                            return null;
                        } catch (NoSuchFieldException e) {
                            return e;
                        } catch (IllegalAccessException e) {
                            return e;
                        }
                    }
                });
            
                if (maybeException instanceof Exception) {
                    selectedKeys = null;
                    Exception e = (Exception) maybeException;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                    return new SelectorTuple(unwrappedSelector);
                }
                selectedKeys = selectedKeySet;
                logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
                return new SelectorTuple(unwrappedSelector,
                                         new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
            }
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
            • 24
            • 25
            • 26
            • 27
            • 28
            • 29
            • 30
            • 31
            • 32
            • 33
            • 34
            • 35
            • 36
            • 37
            • 38
            • 39
            • 40
            • 41
            • 42
            • 43
            • 44
            • 45
            • 46
            • 47
            • 48
            • 49
            • 50
            • 51
            • 52
            • 53
            • 54
            • 55
            • 56
            • 57
            • 58
            • 59
            • 60
            • 61
            • 62
            • 63
            • 64
            • 65
            • 66
            • 67
            • 68
            • 69
            • 70
            • 71
            • 72
            • 73
            • 74
            • 75
            • 76
            • 77
            • 78
            • 79
            • 80
            • 81
            • 82
            • 83
            • 84
            • 85
            • 86
            • 87
            • 88
            • 89
            • 90
            • 91
            • 92
            • 93
            • 94
            • 95
            • 96
            • 97
            • 98
            • 99
            • 100
            • 101
            • 102
            • 103
            • 104
            • 105
            • 106
            • 107
            • 108
            • 109
            • 110
            • 111
        • 任务队列(taskQueue)

          • 在 NioEventLoop 的构造器中,通过 rejectedExecutionHandlerqueueFactory 构造任务队列,newTaskQueue()根据参数 queueFactory 产生 Queue的实例。其中 rejectedExecutionHandler (源码追踪图1的step-5)由上一层传入,queueFactory可以自定义传入,否则为空。

            在这里插入图片描述

            在这里插入图片描述

          • 追踪 super 方法

            在这里插入图片描述

            • 看到 newTaskQueue()根据参数 queueFactory 产生的 Queue实例最终被赋值给了 SingleThreadEventExecutor 的 taskQueue 属性,taskQueue 是 SingleThreadEventExecutor 中的任务队列,而 NioEventLoop 又继承于 SingleThreadEventExecutor,因此 NioEventLoop 也就具有这个任务队列了。
            • 同理,NioEventLoop 中的定时任务队列 scheduledTaskQueue 也是这么得到的:AbstractScheduledEventExecutor 包含 scheduledTaskQueue 属性,NioEventLoop 又继承于 AbstractScheduledEventExecutor,构造 NioEventLoop 的时候初始化这个 scheduledTaskQueue,因此 NioEventLoop 就有了定时任务队列。
    1.1.3、总结
    • (1)NioEventLoopGroup 的无参数构造函数会调用 NioEventLoopGroup 的有参数构造函数,最终把下面的参数传递给父类 MultithreadEventLoopGroup 的有参数构造函数。

      nThreads=cpu核数*2
      executor=null
      chooserFactory=DefaultEventExecutorChooserFactory.INSTANCE
      selectorProvider=SelectorProvider.provider()
      selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE
      rejectedExecutionHandler=RejectedExecutionHandlers.reject()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • (2)父类 MultithreadEventLoopGroup 的有参数构造函数创建一个 NioEventLoop 的容器 children = new EventExecutor[nThreads],并构建出 若干个 NioEventLoop 的实例放入其中。

    • (3)构建每一个 NioEventLoop 调用的是 children[i] = newChild(executor, args)。

    • (4)newChild()方法最终调用了 NioEventLoop 的构造函数,初始化其中的选择器、任务队列、执行器等成分。

    • (5)本节只详述了 NioEventLoop 中选择器、任务队列、执行器三个成分的用途和由来,对于其他成分,可按照本节的代码追踪思路继续探究。

    1.2、引导类ServerBootstrap的创建与配置

    本节一起看下服务端启动类 ServerBootstrap 的创建与配置代码背后的逻辑。

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    // 设置线程组
    serverBootstrap.group(bossGroup, workerGroup)
    // 说明服务器端通道的实现类(便于 Netty 做反射处理)
    .channel(NioServerSocketChannel.class)
    // 临时存放已完成三次握手的请求的队列的最大长度。
    .option(ChannelOption.SO_BACKLOG, 100)
    // 对服务端的 NioServerSocketChannel 添加 Handler
    // LoggingHandler 是 netty 内置的一种 ChannelDuplexHandler,既可以处理出站事件,又可以处理入站事件,即 LoggingHandler 既记录出站日志又记录入站日志。
    .handler(new LoggingHandler(LogLevel.INFO))
    // 对服务端接收到的、与客户端之间建立的 SocketChannel 添加 Handler
    .childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline p = ch.pipeline();
          if (sslCtx != null) {
              // sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理
              p.addLast(sslCtx.newHandler(ch.alloc()));
          }
          // 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志
          // p.addLast(new LoggingHandler(LogLevel.INFO));
    
          // 业务 serverHandler
          p.addLast(serverHandler);
      }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ServerBootstrap 提供了一系列的链式配置方法,具体而言就是 ServerBootstrap 对象的一个配置方法(比如.group())处理完配置参数之后,会将当前 ServerBootstrap 对象返回,这样就能紧随其后继续调用该对象的其他配置方法(比如.channel())。这是面向对象语言中常见的一种编程模式。

    1.2.1、ServerBootstrap 的成分

    在这里插入图片描述

    • 此时 ServerBootstrap 刚刚被创建,且未进行设置。它包含一个 ServerBootstrapConfig 对象,而这个对象又引用了 ServerBootstrap 对象,因此两个是互相引用、互相包含的关系。此外还包含了 group、handler、childGroup、childHandler 等成分,目前这些成分都为 null,后面进行的各种设置就是为这些成分赋值。

    • ServerBootstrap 和 Bootstrap 一样,都继承于抽象类 AbstractBootstrap。因此两者具备很多相同的属性和 API,例如 group、channelFactory、localAddress、options、attrs、handler、channel()、channelFactory()、register()、bind()等等。

      在这里插入图片描述

    1.2.2、ServerBootstrap 的配置
    • .group(bossGroup, workerGroup):作用是把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。

      在这里插入图片描述

    • .channel(NioServerSocketChannel.class):作用是通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。

      在这里插入图片描述

      • 服务端的 NioServerSocketChannel 实例就是通过这个 channelFactory 创建的,不过现在还没有开始创建,要等到后面调用.bind()的时候才会创建。
    • .option(ChannelOption.XXX, YYY):作用是将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。

      在这里插入图片描述

    • .childOption(ChannelOption.XXX, YYY):作用是将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。

      在这里插入图片描述

    • .handler(ChannelHandler handler):作用是将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。

      在这里插入图片描述

      • 这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
    • .childHandler(ChannelHandler handler):作用是为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

      在这里插入图片描述

      • ChannelInitializer 本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

        在这里插入图片描述

         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 if (sslCtx != null) {
                     // sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理
                     p.addLast(sslCtx.newHandler(ch.alloc()));
                 }
                 // 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志
                 // p.addLast(new LoggingHandler(LogLevel.INFO));
        
                 // 业务 serverHandler
                 p.addLast(serverHandler);
             }
         })
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • p.addLast(serverHandler):调用了 Pipeline 的 addLast 方法向 Pipeline 中的双向链表添加 ChannelHandlerContext 元素:

          在这里插入图片描述

    1.2.3、总结
    • (1).group(bossGroup, workerGroup)把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
    • (2).channel(NioServerSocketChannel.class)通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。在调用.bind()的时候 channelFactory 会创建 NioServerSocketChannel 的实例。
    • (3).option(ChannelOption.XXX, YYY) 将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
    • (4).childOption(ChannelOption.XXX, YYY) 将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
    • (5).handler(ChannelHandler handler) 将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
    • (6).childHandler(ChannelHandler handler) 参数通常使用ChannelInitializer,其本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

    1.3、执行ServerBootstrap.bind(PORT)时发生了什么

    本节介绍的 bind(PORT) ,实质是调用 AbstractBootstrap 的 doBind(final SocketAddress localAddress) 方法。

    在这里插入图片描述

    1.3.1、doBind 源码
    • private ChannelFuture doBind(final SocketAddress localAddress) 源码:

      PS: 主要关注 initAndRegister() 和 doBind0(regFuture, channel, localAddress, promise)。

      private ChannelFuture doBind(final SocketAddress localAddress) {
          final ChannelFuture regFuture = initAndRegister();// (1) 初始化 NioServerSocketChannel 的实例,并且将其注册到 bossGroup 中的 EvenLoop 中的 Selector 中
          final Channel channel = regFuture.channel();
          if (regFuture.cause() != null) {
              return regFuture;
          }
      
          if (regFuture.isDone()) {
              // 若异步过程 initAndRegister()已经执行完毕,则进入该分支
              // At this point we know that the registration was complete and successful.
              ChannelPromise promise = channel.newPromise();
              doBind0(regFuture, channel, localAddress, promise);//(2) 调用底层 JDK 接口完成端口绑定和监听
              return promise;
          } else {
              // 若异步过程 initAndRegister()还未执行完毕,则进入该分支
              // Registration future is almost always fulfilled already, but just in case it's not.
              final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
              // 监听 regFuture 的完成事件,完成之后再调用 doBind0(regFuture, channel, localAddress, promise);
              regFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      Throwable cause = future.cause();
                      if (cause != null) {
                          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                          // IllegalStateException once we try to access the EventLoop of the Channel.
                          promise.setFailure(cause);
                      } else {
                          // Registration was successful, so set the correct executor to use.
                          // See https://github.com/netty/netty/issues/2586
                          promise.registered();
      
                          doBind0(regFuture, channel, localAddress, promise);
                      }
                  }
              });
              return promise;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • (1)initAndRegister() 源码:

        PS: 主要关注 newChannel() 、init(channel)、register(channel) 方法。

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();//(1-1) 创建 NioServerSocketChannel 实例
                init(channel);//(1-2) 对该 NioServerSocketChannel 进行初始化
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
        
            ChannelFuture regFuture = config().group().register(channel);//(1-3) 最终把 NioServerSocketChannel 实例注册到 bossGroup 中 EventLoop 中的 Selector 上。
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
        
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
        
            return regFuture;
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • (1-1) newChannel() 源码追踪:

          在这里插入图片描述

          • 前面介绍 ServerBootstrap 的配置.channel(NioServerSocketChannel.class) 已经说明了 channelFactory 的作用。
          • newChannel() 实质是 ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例。
          • 在 NioServerSocketChannel 的空构造方法往下追踪源码,会发现传递了 SelectionKey.OP_ACCEPT 参数,并且赋予给 readInterestOp 属性,作用是标识该 Channel 感兴趣的事件。
          • 继续追踪下去会在 AbstractChannel(Channel parent) 中的 newChannelPipeline() -> DefaultChannelPipeline(Channel channel) 创建了 ChannelPipeline。里面的 head、tail 实质是 ChannelHandlerContext 类型的双向链表。
        • (1-2)init(channel) 源码:

          PS: init(channel) 方法在 AbstractBootstrap 中是抽象方法,在 ServerBootstrap 中进行了实现。注意这里添加了 ServerBootstrapAcceptor ,而且这是一个 ChannelInboundHandler。

          在这里插入图片描述

          /**
          * ServerBootstrap.init()方法,它在 channel = channelFactory.newChannel() 之后被执行,用于初始化这个 channel
          */
          @Override
          void init(Channel channel) throws Exception {
              final Map<ChannelOption<?>, Object> options = options0();
              synchronized (options) {
                  setChannelOptions(channel, options, logger);
                  // 通过.option()设置的 TCP 参数就在这里应用
                  //static void setChannelOptions(
                  //        Channel channel, Map, Object> options, InternalLogger logger) {
                  //    for (Map.Entry, Object> e: options.entrySet()) {
                  //        setChannelOption(channel, e.getKey(), e.getValue(), logger);
                  //    }
                  //}
              }
          
              final Map<AttributeKey<?>, Object> attrs = attrs0();
              synchronized (attrs) {
                  // 通过.attr()设置的附加属性就在这里应用
                  for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                      @SuppressWarnings("unchecked")
                      AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                      channel.attr(key).set(e.getValue());
                  }
              }
          
              ChannelPipeline p = channel.pipeline();
          
              final EventLoopGroup currentChildGroup = childGroup;
              final ChannelHandler currentChildHandler = childHandler;
              final Entry<ChannelOption<?>, Object>[] currentChildOptions;
              final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
              synchronized (childOptions) {
                  currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
              }
              synchronized (childAttrs) {
                  currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
              }
          
              p.addLast(new ChannelInitializer<Channel>() {
                  @Override
                  public void initChannel(final Channel ch) throws Exception {
                      // 获取 NioServerSocketChannel 实例的 pipeline
                      final ChannelPipeline pipeline = ch.pipeline();
                      // 这里的 config.handler()就是前面通过 .handler(ChannelHandler handler) 设置的 handler
                      ChannelHandler handler = config.handler();
                      if (handler != null) {
                          // 将这个 handler 添加到 NioServerSocketChannel 实例的 pipeline 中
                          pipeline.addLast(handler);
                      }
          
                      // 异步执行向 pipeline 添加 ServerBootstrapAcceptor 的步骤
                      ch.eventLoop().execute(new Runnable() {
                          @Override
                          public void run() {
                              // ServerBootstrapAcceptor 是一个 ChannelInboundHandler
                              pipeline.addLast(new ServerBootstrapAcceptor(
                                  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                          }
                      });
                  }
              });
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
          • 58
          • 59
          • 60
          • 61
          • 62
          • 63
          • 64
        • (1-3)register(channel) 源码追踪:

          在这里插入图片描述

          • 在上图的最后一个红色框圈住的代码处,NioServerSocketChannel 的实例被注册到 bossGroup 中 EventLoop 中的 Selector 上(ops: 0 在这里猜测是ready的意思,因为后面会在 AbstractNioChannel.doBeginRead() 方法中真正设置key感兴趣的ops)

            在这里插入图片描述

            • doBeginRead() 方法会在channel首次注册激活或者每次readComplete之后发生(如果开启了isAutoRead,默认是开启的)。需要注意的是,即使读事件发生的时候,readyOps是0,同样可以进行read。

              在这里插入图片描述

      • (2)doBind0(regFuture, channel, localAddress, promise) 源码追踪:

        在这里插入图片描述

        • 在 NioServerSocketChannel 中的 javaChannel().bind(localAddress, config.getBacklog()) 调用底层 JDK 接口完成端口绑定和监听。
        • 追踪下去会发现最终调用了一个 Native 方法把.bind(PORT)最终托管给了 JVM,然后 JVM 进行系统调用。
    1.3.2、run 死循环
    • 在调用register0、doBind0等方法的时候,会委托给 EventLoop 去执行,如果是当前 EventLoop,直接执行 register0 方法,否则会交给 EventLoop.execute(Runnable task)(一般情况下都是会这样异步执行)。

      // SingleThreadEventExecutor.execute(Runnable task):
      @Override
      public void execute(Runnable task) {
          if (task == null) {
              throw new NullPointerException("task");
          }
      
          boolean inEventLoop = inEventLoop();
          // 添加任务到队列
          addTask(task);
          if (!inEventLoop) {
              // 如果当前线程不属于该 EventLoop,EventLoop 需要启动新线程。最终会执行 SingleThreadEventExecutor.this.run() 方法,进入到了 NioEventLoop 中 run 方法的死循环里。
              startThread();
              if (isShutdown()) {
                  boolean reject = false;
                  try {
                      if (removeTask(task)) {
                          reject = true;
                      }
                  } catch (UnsupportedOperationException e) {
                      // The task queue does not support removal so the best thing we can do is to just move on and
                      // hope we will be able to pick-up the task before its completely terminated.
                      // In worst case we will log on termination.
                  }
                  if (reject) {
                      reject();
                  }
              }
          }
      
          if (!addTaskWakesUp && wakesUpForTask(task)) {
              wakeup(inEventLoop);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    • 异步添加任务到队列之后,会在 run 循环里面,通过 runAllTasks() 方法执行队列里面的任务。即register0、doBind0等方法会在这时候被处理。

    • NioEventLoop 中的死循环,不断执行以下三个过程:

      • select:轮训注册在其中的 Selector 上的 Channel 的 IO 事件。
      • processSelectedKeys:在对应的 Channel 上处理 IO 事件。
      • runAllTasks:再去以此循环处理任务队列中的其他任务。

      在这里插入图片描述

      // NioEventLoop.run():
      @Override
      protected void run() {
          for (;;) {
              try {
                  try {
                      switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                          case SelectStrategy.CONTINUE:
                              continue;
      
                          case SelectStrategy.BUSY_WAIT:
                              // fall-through to SELECT since the busy-wait is not supported with NIO
      
                          case SelectStrategy.SELECT:
                              select(wakenUp.getAndSet(false));
      
                              // 'wakenUp.compareAndSet(false, true)' is always evaluated
                              // before calling 'selector.wakeup()' to reduce the wake-up
                              // overhead. (Selector.wakeup() is an expensive operation.)
                              //
                              // However, there is a race condition in this approach.
                              // The race condition is triggered when 'wakenUp' is set to
                              // true too early.
                              //
                              // 'wakenUp' is set to true too early if:
                              // 1) Selector is waken up between 'wakenUp.set(false)' and
                              //    'selector.select(...)'. (BAD)
                              // 2) Selector is waken up between 'selector.select(...)' and
                              //    'if (wakenUp.get()) { ... }'. (OK)
                              //
                              // In the first case, 'wakenUp' is set to true and the
                              // following 'selector.select(...)' will wake up immediately.
                              // Until 'wakenUp' is set to false again in the next round,
                              // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                              // any attempt to wake up the Selector will fail, too, causing
                              // the following 'selector.select(...)' call to block
                              // unnecessarily.
                              //
                              // To fix this problem, we wake up the selector again if wakenUp
                              // is true immediately after selector.select(...).
                              // It is inefficient in that it wakes up the selector for both
                              // the first case (BAD - wake-up required) and the second case
                              // (OK - no wake-up required).
      
                              if (wakenUp.get()) {
                                  selector.wakeup();
                              }
                              // fall through
                          default:
                      }
                  } catch (IOException e) {
                      // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                      // the selector and retry. https://github.com/netty/netty/issues/8566
                      rebuildSelector0();
                      handleLoopException(e);
                      continue;
                  }
      
                  cancelledKeys = 0;
                  needsToSelectAgain = false;
                  final int ioRatio = this.ioRatio;
                  if (ioRatio == 100) {
                      try {
                          processSelectedKeys();
                      } finally {
                          // Ensure we always run tasks.
                          runAllTasks();
                      }
                  } else {
                      final long ioStartTime = System.nanoTime();
                      try {
                          processSelectedKeys();
                      } finally {
                          // Ensure we always run tasks.
                          final long ioTime = System.nanoTime() - ioStartTime;
                          runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                      }
                  }
              } catch (Throwable t) {
                  handleLoopException(t);
              }
              // Always handle shutdown even if the loop processing threw an exception.
              try {
                  if (isShuttingDown()) {
                      closeAll();
                      if (confirmShutdown()) {
                          return;
                      }
                  }
              } catch (Throwable t) {
                  handleLoopException(t);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
    1.3.3、总结
    • (1)首先调用 AbstractBootstrap 中的 initAndRegister() 方法完成 NioServerSocketChannel 实例的初始化和注册。
    • (2)然后调用 NioServerSocketChannel 实例的 doBind() 方法,最终调用 sun.nio.ch.Net 中的 bind()和 listen() 完成端口绑定和客户端连接监听。
    • (3)在真正 register0(注册)和 bind0 (绑定)之前,会委托当前 eventLoop 的 executor 去执行,实质上是在死循环run方法中通过 runAllTasks() 方法执行 eventLoop 的队列里面的任务。
  • 相关阅读:
    linux系统中常见注册函数的使用方法
    从零打造“乞丐版” React(一)——从命令式编程到声明式编程
    从 jsonpath 和 xpath 到 SPL
    CCF开源发展委员会正式成立,探索开源发展新途径
    基于Android+vue的大学生综合信息处理软件APP设计
    安装K8S
    OceanBase 数据文件缩容实践
    flutter开发实战-防抖Debounce与节流Throttler实现
    云计算时代前端如何保证开源代码的安全性
    常用的CSS
  • 原文地址:https://blog.csdn.net/m0_37385780/article/details/126151152