• Netty服务端启动的整体流程-基于源码4.1.96Final分析


    Netty采用的是主从Reactor多线程的模型,参考Scalable IO in Java,但netty的subReactor为一个组

    一、从FileServer服务器示例入手 

    1. public final class FileServer {
    2. static final boolean SSL = System.getProperty("ssl") != null;
    3. // Use the same default port with the telnet example so that we can use the telnet client example to access it.
    4. static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023"));
    5. public static void main(String[] args) throws Exception {
    6. // Configure SSL.
    7. final SslContext sslCtx = ServerUtil.buildSslContext();
    8. // Configure the server.主从Reactor线程组
    9. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    10. EventLoopGroup workerGroup = new NioEventLoopGroup();
    11. try {
    12. ServerBootstrap b = new ServerBootstrap();
    13. b.group(bossGroup, workerGroup)
    14. //配置主Reactor中的channel类型
    15. .channel(NioServerSocketChannel.class)
    16. // 设置主Reactor中channel的option选项,设置底层JDK NIO Socket的一些选项
    17. .option(ChannelOption.SO_BACKLOG, 100)
    18. //设置主Reactor中Channel->pipline->handler
    19. .handler(new LoggingHandler(LogLevel.INFO))
    20. //设置 SocketChannel 对应的 Handler;
    21. .childHandler(new ChannelInitializer<SocketChannel>() {
    22. @Override
    23. public void initChannel(SocketChannel ch) throws Exception {
    24. ChannelPipeline p = ch.pipeline();
    25. if (sslCtx != null) {
    26. p.addLast(sslCtx.newHandler(ch.alloc()));
    27. }
    28. p.addLast(
    29. new StringEncoder(CharsetUtil.UTF_8),
    30. new LineBasedFrameDecoder(8192),
    31. new StringDecoder(CharsetUtil.UTF_8),
    32. new ChunkedWriteHandler(),
    33. new FileServerHandler());
    34. }
    35. });
    36. // Start the server.
    37. ChannelFuture f = b.bind(PORT).sync();
    38. // Wait until the server socket is closed.
    39. f.channel().closeFuture().sync();
    40. } finally {
    41. // Shut down all event loops to terminate all threads.
    42. bossGroup.shutdownGracefully();
    43. workerGroup.shutdownGracefully();
    44. }
    45. }
    46. }

    1.1  netty的主从模式    

           首先大致了解netty的主从模式中:bossGroup 中的MainReactor管理的Channel类型为NioServerSocketChannel,用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel,然后采用round-robin轮询的方式从workerGroup中选择一个SubReactor与该客户端NioSocketChannel进行绑定。一个SubReactor线程负责处理多个NioSocketChannel上的IO事件

    1.2 NioServerSocketChannel

     包含了JDK原生的ServerSocketChannel属性

    1.2.1 channel(NioServerSocketChannel.class)

    在执行channel的时候,返回的是channelFactory属性,如下:

    1. return channelFactory(new ReflectiveChannelFactory(
    2. ObjectUtil.checkNotNull(channelClass, "channelClass")
    3. // ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel
    4. public class ReflectiveChannelFactory implements ChannelFactory {
    5. private final Constructor constructor;
    6. public ReflectiveChannelFactory(Class clazz) {
    7. ObjectUtil.checkNotNull(clazz, "clazz");
    8. try {
    9. this.constructor = clazz.getConstructor();
    10. } catch (NoSuchMethodException e) {
    11. throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
    12. " does not have a public non-arg constructor", e);
    13. }
    14. }
    15. @Override
    16. public T newChannel() {
    17. try {
    18. return constructor.newInstance();
    19. } catch (Throwable t) {
    20. throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    21. }
    22. }
    23. @Override
    24. public String toString() {
    25. return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
    26. '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
    27. }
    28. }

    1.3 ChannelInitializer的作用

          Pipeline添加ChannelHandler:1、显式添加的方式是由用户在main线程中通过ServerBootstrap#handler的方式添加。2、如果需要添加多个ChannelHandler,则可以通过ChannelInitializer向pipeline中进行添加。

    1.3.1 childHandler(new ChannelInitializer() {}使用的原因:

          NioSocketChannel是在服务端accept连接后,在服务端NioServerSocketChannel中被创建出来的。但是此时我们正处于配置ServerBootStrap阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel还没有被创建出来,所以也就没办法向客户端NioSocketChannel的pipeline中添加ChannelHandler。 以及客户端NioSocketChannel中Pipeline里可以添加任意多个ChannelHandler,但是Netty框架无法预知用户到底需要添加多少个ChannelHandler,所以Netty框架提供了回调函数ChannelInitializer#initChannel,使用户可以自定义ChannelHandler的添加行为。

    二、服务端启动全过程

    1. public ChannelFuture bind(int inetPort) {
    2. return bind(new InetSocketAddress(inetPort));
    3. }
    4. public ChannelFuture bind(SocketAddress localAddress) {
    5. //校验Netty核心组件是否配置齐全
    6. validate();
    7. //服务端开始启动,绑定端口地址,接收客户端连接
    8. return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    9. }
    10. private ChannelFuture doBind(final SocketAddress localAddress) {
    11. //异步创建,初始化,注册ServerSocketChannel到main reactor上
    12. final ChannelFuture regFuture = initAndRegister();
    13. final Channel channel = regFuture.channel();
    14. if (regFuture.cause() != null) {
    15. return regFuture;
    16. }
    17. if (regFuture.isDone()) {
    18. ........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
    19. } else {
    20. //如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。
    21. regFuture.addListener(new ChannelFutureListener() {
    22. @Override
    23. public void operationComplete(ChannelFuture future) throws Exception {
    24. ........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
    25. });
    26. return promise;
    27. }
    28. }

    2.1 初始化并注册channel

    1. final ChannelFuture initAndRegister() {
    2. Channel channel = null;
    3. try {
    4. // io.netty.channel.ReflectiveChannelFactory.newChannel
    5. channel = channelFactory.newChannel();
    6. // 初始化channel
    7. init(channel);
    8. } catch (Throwable t) {
    9. if (channel != null) {
    10. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    11. channel.unsafe().closeForcibly();
    12. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    13. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    14. }
    15. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    16. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    17. }
    18. ChannelFuture regFuture = config().group().register(channel);
    19. if (regFuture.cause() != null) {
    20. if (channel.isRegistered()) {
    21. channel.close();
    22. } else {
    23. channel.unsafe().closeForcibly();
    24. }
    25. }
    26. // If we are here and the promise is not failed, it's one of the following cases:
    27. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    28. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    29. // 2) If we attempted registration from the other thread, the registration request has been successfully
    30. // added to the event loop's task queue for later execution.
    31. // i.e. It's safe to attempt bind() or connect() now:
    32. // because bind() or connect() will be executed *after* the scheduled registration task is executed
    33. // because register(), bind(), and connect() are all bound to the same thread.
    34. return regFuture;
    35. }

    2.1.1 channelFactory.newChannel();

        根据1.2.1 可以知道,实际就是调用return constructor.newInstance();也就是实例化NioServerSocketChannel
    1. public class NioServerSocketChannel extends AbstractNioMessageChannel
    2. implements io.netty.channel.socket.ServerSocketChannel {
    3. //SelectorProvider(用于创建Selector和Selectable Channels)
    4. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    5. /**
    6. * Create a new instance
    7. */
    8. public NioServerSocketChannel() {
    9. this(DEFAULT_SELECTOR_PROVIDER);
    10. }
    11. /**
    12. * Create a new instance using the given {@link SelectorProvider}.
    13. */
    14. public NioServerSocketChannel(SelectorProvider provider) {
    15. this(provider, null);
    16. }
    17. /**
    18. * Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
    19. */
    20. public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
    21. this(newChannel(provider, family));
    22. }
    23. /**
    24. * Create a new instance using the given {@link ServerSocketChannel}.
    25. */
    26. public NioServerSocketChannel(ServerSocketChannel channel) {
    27. super(null, channel, SelectionKey.OP_ACCEPT);
    28. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    29. }
    30. }
    2.1.1.1 SelectorProvider选择器和可选择通道的服务提供者类
    1. public static SelectorProvider provider() {
    2. synchronized (lock) {
    3. if (provider != null)
    4. return provider;
    5. return AccessController.doPrivileged(
    6. new PrivilegedAction() {
    7. public SelectorProvider run() {
    8. if (loadProviderFromProperty())
    9. return provider;
    10. if (loadProviderAsService())
    11. return provider;
    12. provider = sun.nio.ch.DefaultSelectorProvider.create();
    13. return provider;
    14. }
    15. });
    16. }
    17. }
    18. // 支持根据系统属性名进行实例化。
    19. private static boolean loadProviderFromProperty() {
    20. String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
    21. if (cn == null)
    22. return false;
    23. try {
    24. Class c = Class.forName(cn, true,
    25. ClassLoader.getSystemClassLoader());
    26. provider = (SelectorProvider)c.newInstance();
    27. return true;
    28. } catch (ClassNotFoundException x) {
    29. throw new ServiceConfigurationError(null, x);
    30. } catch (IllegalAccessException x) {
    31. throw new ServiceConfigurationError(null, x);
    32. } catch (InstantiationException x) {
    33. throw new ServiceConfigurationError(null, x);
    34. } catch (SecurityException x) {
    35. throw new ServiceConfigurationError(null, x);
    36. }
    37. }
    38. // 根据spi进行实例化,即META-INF/services/下的定义名为java.nio.channels.spi.SelectorProvider的SPI文件,文件中第一个定义的SelectorProvider实现类全限定名就会被加载。
    39. private static boolean loadProviderAsService() {
    40. ServiceLoader sl =
    41. ServiceLoader.load(SelectorProvider.class,
    42. ClassLoader.getSystemClassLoader());
    43. Iterator i = sl.iterator();
    44. for (;;) {
    45. try {
    46. if (!i.hasNext())
    47. return false;
    48. provider = i.next();
    49. return true;
    50. } catch (ServiceConfigurationError sce) {
    51. if (sce.getCause() instanceof SecurityException) {
    52. // Ignore the security exception, try the next provider
    53. continue;
    54. }
    55. throw sce;
    56. }
    57. }
    58. }
    59. //因为是windows
    60. public class DefaultSelectorProvider {
    61. private DefaultSelectorProvider() {
    62. }
    63. public static SelectorProvider create() {
    64. return new WindowsSelectorProvider();
    65. }
    66. }

    nio中的channel注册selector 

    2.1.1.2 newChannel(provider, family) 
    1. private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
    2. try {
    3. // family为空时 SelectorProviderUtil.newChannel 返回null
    4. ServerSocketChannel channel =
    5. SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
    6. // 创建 JDK 底层的 ServerSocketChannel
    7. return channel == null ? provider.openServerSocketChannel() : channel;
    8. } catch (IOException e) {
    9. throw new ChannelException("Failed to open a socket.", e);
    10. }
    11. }

    因为初始化的时候family为null,所以调用的是JDK底层的openServerSocketChannel

    2.1.1.3  NioServerSocketChannel构造

    1. //设置的是SelectionKey.OP_ACCEPT事件
    2. super(null, channel, SelectionKey.OP_ACCEPT);
    3. // 创建Channel的配置类NioServerSocketChannelConfig,在配置类中封装了对Channel底层的一些配置行为,以及JDK中的ServerSocket。以及创建NioServerSocketChannel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator
    4. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    5. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    6. super(parent);
    7. this.ch = ch;
    8. this.readInterestOp = readInterestOp;
    9. try {
    10. // 设置 Channel 为非阻塞模式。
    11. ch.configureBlocking(false);
    12. } catch (IOException e) {
    13. try {
    14. ch.close();
    15. } catch (IOException e2) {
    16. logger.warn(
    17. "Failed to close a partially initialized socket.", e2);
    18. }
    19. throw new ChannelException("Failed to enter non-blocking mode.", e);
    20. }
    21. }
    22. protected AbstractChannel(Channel parent) {
    23. this.parent = parent;
    24. id = newId(); // 全局唯一id
    25. unsafe = newUnsafe(); // unsafe 操作底层读写
    26. pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
    27. }
    28. protected DefaultChannelPipeline newChannelPipeline() {
    29. return new DefaultChannelPipeline(this);
    30. }
    31. protected DefaultChannelPipeline(Channel channel) {
    32. this.channel = ObjectUtil.checkNotNull(channel, "channel");
    33. succeededFuture = new SucceededChannelFuture(channel, null);
    34. voidPromise = new VoidChannelPromise(channel, true);
    35. tail = new TailContext(this);
    36. head = new HeadContext(this);
    37. head.next = tail;
    38. tail.prev = head;
    39. }

    此时channel的pipeline只有head和tail两个节点;

    2.1.2 init(channel);初始化

    1. @Override
    2. void init(Channel channel) {
    3. setChannelOptions(channel, newOptionsArray(), logger);
    4. setAttributes(channel, newAttributesArray());
    5. ChannelPipeline p = channel.pipeline();
    6. final EventLoopGroup currentChildGroup = childGroup;
    7. final ChannelHandler currentChildHandler = childHandler;
    8. final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    9. final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    10. // ChannelInitializer 实现的 initChannel() 方法用于添加 ServerSocketChannel 对应的 Handler
    11. p.addLast(new ChannelInitializer<Channel>() {
    12. @Override
    13. public void initChannel(final Channel ch) {
    14. final ChannelPipeline pipeline = ch.pipeline();
    15. // 将handler(new LoggingHandler(LogLevel.INFO)) 中的handler加入pipeLine
    16. ChannelHandler handler = config.handler();
    17. if (handler != null) {
    18. pipeline.addLast(handler);
    19. }
    20. // 然后 Netty 通过异步 task 的方式又向 Pipeline 一个处理器 ServerBootstrapAcceptor,这是一个连接接入器,专门用于接收新的连接,然后把事件分发给 EventLoop 执行
    21. ch.eventLoop().execute(new Runnable() {
    22. @Override
    23. public void run() {
    24. pipeline.addLast(new ServerBootstrapAcceptor(
    25. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    26. }
    27. });
    28. }
    29. });
    30. }
    • ServerBootstrapAcceptor:也就是对应MainReactor中的acceptor,本质上也是一种ChannelHandler,主要负责在客户端连接建立好后,初始化客户端NioSocketChannel,在从Reactor线程组中选取一个SubReactor,将客户端NioSocketChannel注册到SubReactor中的selector上。

    • 初始化NioServerSocketChannel中pipeline的时机是:当NioServerSocketChannel注册到Main Reactor之后,绑定端口地址之前,同时为了保证线程安全地初始化pipeline,初始化的动作netty统一交给了Reactor线程进行

    • ServerBootstrapAcceptor 的注册过程为什么又需要封装成异步 task 呢?因为本文案例是

      handler(new LoggingHandler(LogLevel.INFO))但是考虑到过程中可能为new ChannelInitializer() ,那么在后续Main Reactor处理register0任务invokeHandlerAddedIfNeeded方法时会调用具体的ChannelInitializer的initChannel方法进行实例会进行添加到最后一个处理节点,如果这里不是异步task那么就会导致该Acceptor为pipeline的一个中间Handler,因此为了保证ServerBootstrapAcceptor是最后一个处理节点,所以本文就封装了一个异步任务。 等到新连接接入时,就可以调用pipeline.fireChannelRead();从head节点依次往下进行传播,直到传播到ServerBootstrapAcceptor

    2.2 注册channel到mainReactor中

    2.2.1 轮询选取MainReactor

    1. ChannelFuture regFuture = config().group().register(channel);
    2. ServerBootstrap获取主Reactor线程组NioEventLoopGroup,将NioServerSocketChannel注册到NioEventLoopGroup中。
    3. @Override
    4. public ChannelFuture register(Channel channel) {
    5. return next().register(channel);
    6. }
    7. @Override
    8. public EventExecutor next() {
    9. return chooser.next();
    10. }
    11. //获取绑定策略
    12. @Override
    13. public EventExecutorChooser newChooser(EventExecutor[] executors) {
    14. // 判断2的次幂
    15. if (isPowerOfTwo(executors.length)) {
    16. return new PowerOfTwoEventExecutorChooser(executors);
    17. } else {
    18. return new GenericEventExecutorChooser(executors);
    19. }
    20. }
    21. //采用轮询round-robin的方式选择Reactor
    22. @Override
    23. public EventExecutor next() {
    24. return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    25. }
    26. private static boolean isPowerOfTwo(int val) {
    27. return (val & -val) == val;
    28. }
    29. 正数的补码,反码,原码都是一样的。
    30. 负数的补码为反码加1,负数的反码为除符号位原码按位取反。

    2.2.2 register

    1. @Override
    2. public ChannelFuture register(Channel channel) {
    3. return register(new DefaultChannelPromise(channel, this));
    4. }
    5. @Override
    6. public ChannelFuture register(final ChannelPromise promise) {
    7. ObjectUtil.checkNotNull(promise, "promise");
    8. promise.channel().unsafe().register(this, promise);
    9. return promise;
    10. }
    11. @Override
    12. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    13. ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    14. if (isRegistered()) {
    15. promise.setFailure(new IllegalStateException("registered to an event loop already"));
    16. return;
    17. }
    18. if (!isCompatible(eventLoop)) {
    19. promise.setFailure(
    20. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    21. return;
    22. }
    23. //在channel上设置绑定的Reactor
    24. AbstractChannel.this.eventLoop = eventLoop;
    25. /**
    26. * 执行channel注册的操作必须是Reactor线程来完成
    27. *
    28. * 1: 如果当前执行线程是Reactor线程,则直接执行register0进行注册
    29. * 2:如果当前执行线程是外部线程,则需要将register0注册操作 封装程异步Task 由Reactor线程执行
    30. * */
    31. if (eventLoop.inEventLoop()) {
    32. register0(promise);
    33. } else {//外部线程调用
    34. try {
    35. eventLoop.execute(new Runnable() {
    36. @Override
    37. public void run() {
    38. register0(promise);
    39. }
    40. });
    41. } catch (Throwable t) {
    42. logger.warn(
    43. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    44. AbstractChannel.this, t);
    45. closeForcibly();
    46. closeFuture.setClosed();
    47. safeSetFailure(promise, t);
    48. }
    49. }
    50. }

    当前执行线程并不是Reactor线程,而是用户程序的启动线程Main线程,所以提交异步task并进行了启动Reactor线程

    1. //Reactor线程的启动是在向Reactor提交第一个异步任务的时候启动的。
    2. private void execute(Runnable task, boolean immediate) {
    3. boolean inEventLoop = inEventLoop();
    4. addTask(task);
    5. if (!inEventLoop) {
    6. startThread();
    7. if (isShutdown()) {
    8. boolean reject = false;
    9. try {
    10. if (removeTask(task)) {
    11. reject = true;
    12. }
    13. } catch (UnsupportedOperationException e) {
    14. // The task queue does not support removal so the best thing we can do is to just move on and
    15. // hope we will be able to pick-up the task before its completely terminated.
    16. // In worst case we will log on termination.
    17. }
    18. if (reject) {
    19. reject();
    20. }
    21. }
    22. }
    23. if (!addTaskWakesUp && immediate) {
    24. wakeup(inEventLoop);
    25. }
    26. }
    27. private void startThread() {
    28. if (state == ST_NOT_STARTED) {
    29. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    30. boolean success = false;
    31. try {
    32. doStartThread();
    33. success = true;
    34. } finally {
    35. if (!success) {
    36. STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
    37. }
    38. }
    39. }
    40. }
    41. }

    2.2.3 register0()-MainReactor异步任务执行

    1. //一开始Reactor中的任务队列中只有一个任务register0,Reactor线程启动后,会从任务队列中取出任务执行。
    2. private void register0(ChannelPromise promise) {
    3. try {
    4. // check if the channel is still open as it could be closed in the mean time when the register
    5. // call was outside of the eventLoop
    6. if (!promise.setUncancellable() || !ensureOpen(promise)) {
    7. return;
    8. }
    9. boolean firstRegistration = neverRegistered;
    10. // 调用 JDK 底层的 register() 进行注册
    11. doRegister();
    12. neverRegistered = false;
    13. registered = true;
    14. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    15. // user may already fire events through the pipeline in the ChannelFutureListener.
    16. //回调pipeline中添加的ChannelInitializer的handlerAdded方法,在这里初始化channelPipeline
    17. pipeline.invokeHandlerAddedIfNeeded(); // 触发 handlerAdded 事件
    18. safeSetSuccess(promise);
    19. // channelRegistered 事件是由 fireChannelRegistered() 方法触发,沿着 Pipeline 的 Head 节点传播到 Tail 节点
    20. pipeline.fireChannelRegistered();
    21. // Only fire a channelActive if the channel has never been registered. This prevents firing
    22. // multiple channel actives if the channel is deregistered and re-registered.
    23. //对于服务端ServerSocketChannel来说 只有绑定端口地址成功后 channel的状态才是active的。
    24. //此时绑定操作作为异步任务在Reactor的任务队列中,绑定操作还没开始,所以这里的isActive()是false
    25. if (isActive()) {
    26. if (firstRegistration) {
    27. pipeline.fireChannelActive();
    28. } else if (config().isAutoRead()) {
    29. // This channel was registered before and autoRead() is set. This means we need to begin read
    30. // again so that we process inbound data.
    31. //
    32. // See https://github.com/netty/netty/issues/4805
    33. beginRead();
    34. }
    35. }
    36. } catch (Throwable t) {
    37. // Close the channel directly to avoid FD leak.
    38. closeForcibly();
    39. closeFuture.setClosed();
    40. safeSetFailure(promise, t);
    41. }
    42. }
    2.2.3.1 doRegister
    1. @Override
    2. protected void doRegister() throws Exception {
    3. boolean selected = false;
    4. for (;;) {
    5. try {
    6. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    7. return;
    8. } catch (CancelledKeyException e) {
    9. if (!selected) {
    10. // Force the Selector to select now as the "canceled" SelectionKey may still be
    11. // cached and not removed because no Select.select(..) operation was called yet.
    12. eventLoop().selectNow();
    13. selected = true;
    14. } else {
    15. // We forced a select operation on the selector before but the SelectionKey is still cached
    16. // for whatever reason. JDK bug ?
    17. throw e;
    18. }
    19. }
    20. }
    21. }
    22. public final SelectionKey register(Selector sel, int ops,
    23. Object att)
    24. throws ClosedChannelException
    25. {
    26. synchronized (regLock) {
    27. if (!isOpen())
    28. throw new ClosedChannelException();
    29. if ((ops & ~validOps()) != 0)
    30. throw new IllegalArgumentException();
    31. if (blocking)
    32. throw new IllegalBlockingModeException();
    33. SelectionKey k = findKey(sel);
    34. if (k != null) {
    35. k.interestOps(ops);
    36. k.attach(att);
    37. }
    38. if (k == null) {
    39. // New registration
    40. synchronized (keyLock) {
    41. if (!isOpen())
    42. throw new ClosedChannelException();
    43. k = ((AbstractSelector)sel).register(this, ops, att);
    44. addKey(k);
    45. }
    46. }
    47. return k;
    48. }
    49. }

           javaChannel().register() 负责调用 JDK 底层,将 Channel 注册到 Selector 上,register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的attachment上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。

    2.2.3.2 handlerAdded

    初始化ChannelPipeline的时机是当Channel向对应的Reactor注册成功后,在handlerAdded事件回调中利用ChannelInitializer进行初始化。

    io.netty.channel.ChannelInitializer#handlerAdded

    1. @Override
    2. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    3. // 必须是注册以后
    4. if (ctx.channel().isRegistered()) {
    5. // This should always be true with our current DefaultChannelPipeline implementation.
    6. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
    7. // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
    8. // will be added in the expected order.
    9. if (initChannel(ctx)) {
    10. // We are done with init the Channel, removing the initializer now.
    11. removeState(ctx);
    12. }
    13. }
    14. }
    15. //ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline
    16. //通过Set集合保存已经初始化的ChannelPipeline,避免重复初始化同一ChannelPipeline
    17. private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
    18. new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
    19. private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    20. if (initMap.add(ctx)) { // Guard against re-entrance.
    21. try {
    22. initChannel((C) ctx.channel());
    23. } catch (Throwable cause) {
    24. exceptionCaught(ctx, cause);
    25. } finally {
    26. ChannelPipeline pipeline = ctx.pipeline();
    27. if (pipeline.context(this) != null) {
    28. //初始化完毕后,从pipeline中移除自身
    29. pipeline.remove(this);
    30. }
    31. }
    32. return true;
    33. }
    34. return false;
    35. }
    36. //匿名类实现,这里指定具体的初始化逻辑
    37. protected abstract void initChannel(C ch) throws Exception;
    38. private void removeState(final ChannelHandlerContext ctx) {
    39. //从initMap防重Set集合中删除ChannelInitializer
    40. if (ctx.isRemoved()) {
    41. initMap.remove(ctx);
    42. } else {
    43. ctx.executor().execute(new Runnable() {
    44. @Override
    45. public void run() {
    46. initMap.remove(ctx);
    47. }
    48. });
    49. }
    50. }

    执行完成后pipeline如下:

    执行完整个 register0() 的注册流程之后,EventLoop 线程会将 ServerBootstrapAcceptor 添加到 Pipeline 当中(提交的任务执行)

    1. ch.eventLoop().execute(new Runnable() {
    2. @Override
    3. public void run() {
    4. pipeline.addLast(new ServerBootstrapAcceptor(
    5. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    6. }
    7. }

    也就是统一在EventLoop 线程中初始化pipeLine,保证线程安全

    2.2.4 Reactor线程触发注册成功safeSetSuccess(promise);

    Reactor设置注册成功后,启动线程监听到完成任务,那么就进行接下来的绑定端口操作

    1. // 若执行完毕进行端口绑定
    2. if (regFuture.isDone()) {
    3. // At this point we know that the registration was complete and successful.
    4. ChannelPromise promise = channel.newPromise();
    5. doBind0(regFuture, channel, localAddress, promise);
    6. return promise;
    7. } else {
    8. // Registration future is almost always fulfilled already, but just in case it's not.
    9. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    10. regFuture.addListener(new ChannelFutureListener() {
    11. @Override
    12. public void operationComplete(ChannelFuture future) throws Exception {
    13. Throwable cause = future.cause();
    14. if (cause != null) {
    15. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    16. // IllegalStateException once we try to access the EventLoop of the Channel.
    17. promise.setFailure(cause);
    18. } else {
    19. // Registration was successful, so set the correct executor to use.
    20. // See https://github.com/netty/netty/issues/2586
    21. promise.registered();
    22. doBind0(regFuture, channel, localAddress, promise);
    23. }
    24. }
    25. });
    26. return promise;
    27. }

    2.3 端口绑定

    1. private static void doBind0(
    2. final ChannelFuture regFuture, final Channel channel,
    3. final SocketAddress localAddress, final ChannelPromise promise) {
    4. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
    5. // the pipeline in its channelRegistered() implementation.
    6. channel.eventLoop().execute(new Runnable() {
    7. @Override
    8. sss
    9. if (regFuture.isSuccess()) {
    10. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    11. } else {
    12. promise.setFailure(regFuture.cause());
    13. }
    14. }
    15. });
    16. }

    提交到异步任务到Reactor,绑定逻辑需要注册逻辑处理完之后运行,如上面的ServerBootstrapAcceptor异步任务执行完

    1. @Override
    2. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    3. return pipeline.bind(localAddress, promise);
    4. }
    5. @Override
    6. public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    7. return tail.bind(localAddress, promise);
    8. }
    9. @Override
    10. public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    11. ObjectUtil.checkNotNull(localAddress, "localAddress");
    12. if (isNotValidPromise(promise, false)) {
    13. // cancelled
    14. return promise;
    15. }
    16. final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    17. EventExecutor executor = next.executor();
    18. if (executor.inEventLoop()) {
    19. next.invokeBind(localAddress, promise);
    20. } else {
    21. safeExecute(executor, new Runnable() {
    22. @Override
    23. public void run() {
    24. next.invokeBind(localAddress, promise);
    25. }
    26. }, promise, null, false);
    27. }
    28. return promise;
    29. }
    30. private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    31. if (invokeHandler()) {
    32. try {
    33. // DON'T CHANGE
    34. // Duplex handlers implements both out/in interfaces causing a scalability issue
    35. // see https://bugs.openjdk.org/browse/JDK-8180450
    36. final ChannelHandler handler = handler();
    37. final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
    38. if (handler == headContext) {
    39. headContext.bind(this, localAddress, promise);
    40. } else if (handler instanceof ChannelDuplexHandler) {
    41. ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
    42. } else {
    43. ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
    44. }
    45. } catch (Throwable t) {
    46. notifyOutboundHandlerException(t, promise);
    47. }
    48. } else {
    49. bind(localAddress, promise);
    50. }
    51. }

    调用pipeline.bind(localAddress, promise)pipeline中传播bind事件,触发回调pipeline中所有ChannelHandlerbind方法

    事件在pipeline中的传播具有方向性:

    • inbound事件HeadContext开始逐个向后传播直到TailContext

    • outbound事件则是反向传播,从TailContext开始反向向前传播直到HeadContext

      1. private AbstractChannelHandlerContext findContextOutbound(int mask) {
      2. AbstractChannelHandlerContext ctx = this;
      3. EventExecutor currentExecutor = executor();
      4. do {
      5. ctx = ctx.prev;
      6. // 跳过了 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
      7. } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
      8. return ctx;
      9. }
      10. private static boolean skipContext(
      11. AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
      12. // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
      13. return (ctx.executionMask & (onlyMask | mask)) == 0 ||
      14. // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
      15. // everything to preserve ordering.
      16. //
      17. // See https://github.com/netty/netty/issues/10067
      18. (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
      19. }

    通过上面代码可以知道bind事件在Netty中被定义为outbound事件,所以它在pipeline中是反向传播。先从TailContext开始反向传播直到HeadContext

    因此bind的核心逻辑也正是实现在HeadContext中。

    headContext.bind(this, localAddress, promise);

    底层实际就是

    1. @Override
    2. public void bind(
    3. ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    4. unsafe.bind(localAddress, promise);
    5. }
    6. @Override
    7. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    8. assertEventLoop();
    9. if (!promise.setUncancellable() || !ensureOpen(promise)) {
    10. return;
    11. }
    12. // See: https://github.com/netty/netty/issues/576
    13. if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
    14. localAddress instanceof InetSocketAddress &&
    15. !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
    16. !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
    17. // Warn a user about the fact that a non-root user can't receive a
    18. // broadcast packet on *nix if the socket is bound on non-wildcard address.
    19. logger.warn(
    20. "A non-root user can't receive a broadcast packet if the socket " +
    21. "is not bound to a wildcard address; binding to a non-wildcard " +
    22. "address (" + localAddress + ") anyway as requested.");
    23. }
    24. boolean wasActive = isActive();
    25. try {
    26. doBind(localAddress);
    27. } catch (Throwable t) {
    28. safeSetFailure(promise, t);
    29. closeIfClosed();
    30. return;
    31. }
    32. if (!wasActive && isActive()) {
    33. invokeLater(new Runnable() {
    34. @Override
    35. public void run() {
    36. pipeline.fireChannelActive();
    37. }
    38. });
    39. }
    40. safeSetSuccess(promise);
    41. }
    42. @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    43. @Override
    44. protected void doBind(SocketAddress localAddress) throws Exception {
    45. if (PlatformDependent.javaVersion() >= 7) {
    46. javaChannel().bind(localAddress, config.getBacklog());
    47. } else {
    48. javaChannel().socket().bind(localAddress, config.getBacklog());
    49. }
    50. }

    Netty 会根据 JDK 版本的不同,分别调用 JDK 底层不同的 bind() 方法。我使用的是 JDK8,所以会调用 JDK 原生 Channel 的 bind() 方法。执行完 doBind() 之后,服务端 JDK 原生的 Channel 真正已经完成端口绑定了。

    2.3.1 判断是否为激活

    1. @Override
    2. public boolean isActive() {
    3. // As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed
    4. // we will also need to check if it is open.
    5. return isOpen() && javaChannel().socket().isBound();
    6. }

    2.3.2 channelActive事件

       完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件。

        channelActive事件在Netty中定义为inbound事件,所以它在pipeline中的传播为正向传播,从HeadContext一直到TailContext为止。

    channelActive事件回调中需要触发向Selector指定需要监听的IO事件~~OP_ACCEPT事件

    1. @Override
    2. public final ChannelPipeline fireChannelActive() {
    3. AbstractChannelHandlerContext.invokeChannelActive(head);
    4. return this;
    5. }
    6. private void invokeChannelActive() {
    7. if (invokeHandler()) {
    8. try {
    9. // DON'T CHANGE
    10. // Duplex handlers implements both out/in interfaces causing a scalability issue
    11. // see https://bugs.openjdk.org/browse/JDK-8180450
    12. final ChannelHandler handler = handler();
    13. final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
    14. if (handler == headContext) {
    15. headContext.channelActive(this);
    16. } else if (handler instanceof ChannelDuplexHandler) {
    17. ((ChannelDuplexHandler) handler).channelActive(this);
    18. } else {
    19. ((ChannelInboundHandler) handler).channelActive(this);
    20. }
    21. } catch (Throwable t) {
    22. invokeExceptionCaught(t);
    23. }
    24. } else {
    25. fireChannelActive();
    26. }
    27. }
    28. @Override
    29. public void channelActive(ChannelHandlerContext ctx) {
    30. //pipeline中继续向后传播channelActive事件
    31. ctx.fireChannelActive();
    32. //如果是autoRead 则自动触发read事件传播
    33. //在read回调函数中 触发OP_ACCEPT注册
    34. readIfIsAutoRead();
    35. }
    36. private void readIfIsAutoRead() {
    37. if (channel.config().isAutoRead()) {
    38. channel.read();
    39. }
    40. }
    1. @Override
    2. public Channel read() {
    3. pipeline.read();
    4. return this;
    5. }
    6. @Override
    7. public final ChannelPipeline read() {
    8. tail.read();
    9. return this;
    10. }
    11. @Override
    12. public void read(ChannelHandlerContext ctx) {
    13. unsafe.beginRead();
    14. }
    15. @Override
    16. public final void beginRead() {
    17. assertEventLoop();
    18. try {
    19. doBeginRead();
    20. } catch (final Exception e) {
    21. invokeLater(new Runnable() {
    22. @Override
    23. public void run() {
    24. pipeline.fireExceptionCaught(e);
    25. }
    26. });
    27. close(voidPromise());
    28. }
    29. }
    30. @Override
    31. protected void doBeginRead() throws Exception {
    32. // Channel.read() or ChannelHandlerContext.read() was called
    33. final SelectionKey selectionKey = this.selectionKey;
    34. if (!selectionKey.isValid()) {
    35. return;
    36. }
    37. readPending = true;
    38. final int interestOps = selectionKey.interestOps();
    39. if ((interestOps & readInterestOp) == 0) {
    40. selectionKey.interestOps(interestOps | readInterestOp);
    41. }
    42. }

    在执行完 channelActive 事件传播之后,会调用 readIfIsAutoRead() 方法触发 Channel 的 read 事件,而它最终调用到 AbstractNioChannel 中的 doBeginRead() 方法,其中 readInterestOp 参数就是在前面初始化 Channel 所传入的 SelectionKey.OP_ACCEPT 事件,所以 OP_ACCEPT 事件会被注册到 Channel 的事件集合中。

    2.4 服务启动总结

    • 创建服务端 Channel:本质是创建 JDK 底层原生的 Channel,并初始化几个重要的属性,包括 id、unsafe、pipeline 等。
    • 初始化服务端 Channel:设置 Socket 参数以及用户自定义属性,并添加两个特殊的处理器 ChannelInitializer 和 ServerBootstrapAcceptor。
    • 注册服务端 Channel:调用 JDK 底层将 Channel 注册到 Selector 上。
    • 端口绑定:调用 JDK 底层进行端口绑定,并触发 channelActive 事件,把 OP_ACCEPT 事件注册到 Channel 的事件集合中。
  • 相关阅读:
    stm32管脚映射问题
    行业洞察 | 你的语音小助手的词汇量够吗?
    全新 UI 震撼来袭!ng-matero v18 正式发布!
    介绍两款代码自动生成器,帮助提升工作效率
    java计算机毕业设计农产品交易系统MyBatis+系统+LW文档+源码+调试部署
    SynchronousQueue源码分析_第一讲:引子
    React TypeScript 样式报错
    RabbitMQ队列持久化的重要性与意义
    Word Game
    使用JLINK给GD32下载程序
  • 原文地址:https://blog.csdn.net/qq_38340127/article/details/132549711