• Netty源码阅读(1)之——客户端源码梗概


    目录

    准备

    开始

    NioSocketChannel 的初始化过程

    指定

    初始化

    关于unsafe属性:

    关于pipeline的初始化

    小结

    EventLoopGroup初始化

    小结

    channel的注册过程

    handler的注册过程

     客户端连接

     总结


    准备

    • 源码阅读基于4.1.84.Final版本。
    • 从github下载netty项目,并且使用[netty-example]模块
    • 你需要先对netty有个大概的了解,比如知道它的模型

    开始

    找到[netty-example]模块的ceho包,查看简单的使用案例。

    1. public final class EchoClient {
    2. static final String HOST = System.getProperty("host", "127.0.0.1");
    3. static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    4. static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    5. public static void main(String[] args) throws Exception {
    6. // Configure the client.
    7. EventLoopGroup group = new NioEventLoopGroup();
    8. try {
    9. Bootstrap b = new Bootstrap();
    10. b.group(group)
    11. .channel(NioSocketChannel.class)
    12. .option(ChannelOption.TCP_NODELAY, true)
    13. .handler(new ChannelInitializer() {
    14. @Override
    15. public void initChannel(SocketChannel ch) throws Exception {
    16. ChannelPipeline p = ch.pipeline();
    17. //p.addLast(new LoggingHandler(LogLevel.INFO));
    18. p.addLast(new EchoClientHandler());
    19. }
    20. });
    21. // Start the client.
    22. ChannelFuture f = b.connect(HOST, PORT).sync();
    23. // Wait until the connection is closed.
    24. f.channel().closeFuture().sync();
    25. } finally {
    26. // Shut down the event loop to terminate all threads.
    27. group.shutdownGracefully();
    28. }
    29. }
    30. }
    • group方法:处理要创建的所有事件
    • channel方法:创建channel实例,因为是TCP客户端, 因此使用了 NioSocketChannel
    • option方法:配置channel实例
    • handler方法:设置数据处理器

    上边代码很简单,无非就是新建了个bootstrap,对其进行配置,然后启动客户端连接服务端,最后等待连接关闭。不得不说,netty代码封装的很好,使我们初学者可以简单快速的上手,但是代码封装的越好。意味着内部代码越复杂。

    下边,我们来更详细的看下配置bootstrap的流程

    NioSocketChannel 的初始化过程

    在 Netty 中, Channel 是一个 Socket 的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例. 

    这里,我们需要先知道NioSocketChannel的作用:异步的客户端 TCP Socket 连接。除此之外,还有以下类型:

    • NioSocketChannel, 代表异步的客户端 TCP Socket 连接.
    • NioServerSocketChannel, 异步的服务器端 TCP Socket 连接.
    • NioDatagramChannel, 异步的 UDP 连接
    • NioSctpChannel, 异步的客户端 Sctp 连接.
    • NioSctpServerChannel, 异步的 Sctp 服务器端连接.
    • OioSocketChannel, 同步的客户端 TCP Socket 连接.
    • OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
    • OioDatagramChannel, 同步的 UDP 连接
    • OioSctpChannel, 同步的 Sctp 服务器端连接.
    • OioSctpServerChannel, 同步的客户端 TCP Socket 连接.

    指定

    从上边代码中我们看到了是channel方法中配置的:

    …… 
    .channel(NioSocketChannel.class)

     我们点进去查看

    1. public B channel(Class channelClass) {
    2. return channelFactory(new ReflectiveChannelFactory(
    3. ObjectUtil.checkNotNull(channelClass, "channelClass")
    4. ));
    5. }

    发现就是简单的指定了AbstractBootstrap抽象类中的channelFactory属性&&指定了ReflectiveChannelFactory中的constructor属性。同时我们发现实例化ReflectiveChannelFactory中的constructor,也就是实例化public io.netty.channel.socket.nio.NioSocketChannel()是在ReflectiveChannelFactory类中重写的newChannel方法中。那么具体是在哪里调用的呢?这里先挖个坑Ⅰ

    初始化

    进入NioSocketChannel类,找到无参构造器,调用有参构造器

    1. public NioSocketChannel(SelectorProvider provider) {
    2. this(newSocket(provider));
    3. }

     注意这里的newSocket,是用来打开一个新的 Java NIO SocketChannel。

    构造方法继续往下看点点点……来到AbstractChannel

    1. protected AbstractChannel(Channel parent) {
    2. this.parent = parent;
    3. id = newId();
    4. unsafe = newUnsafe();
    5. pipeline = newChannelPipeline();
    6. }

    关于unsafe属性:

    1. protected AbstractNioUnsafe newUnsafe() {
    2. return new NioSocketChannelUnsafe();
    3. }

     我们直接看NioSocketChannelUnsafe的继承和实现,最后可以看出来实现了Unsafe接口,我们来看下这个接口

     一看便知,这些操作都是和 Java 底层的 Socket 相关的操作。

    关于pipeline的初始化

    首先我们要知道

     Each channel has its own pipeline and it is created automatically when a new channel is created

    在实例化一个 Channel 时, 必然伴随着实例化一个 ChannelPipeline.

    接下来我们来看一下初始化pipeline时都做了那些事情吧,对着newChannelPipeline()往下一直点点到DefaultChannelPipeline的有参构造。

    1. protected DefaultChannelPipeline(Channel channel) {
    2. this.channel = ObjectUtil.checkNotNull(channel, "channel");
    3. succeededFuture = new SucceededChannelFuture(channel, null);
    4. voidPromise = new VoidChannelPromise(channel, true);
    5. tail = new TailContext(this);
    6. head = new HeadContext(this);
    7. head.next = tail;
    8. tail.prev = head;
    9. }

    HeadContext的继承结构图,不难发现有ChannelOutboundHandler和ChannelInboundHandler

     TailContext的继承结构图,只有ChannelInboundHandler

     他们又都继承了AbstractChannelHandlerContext,这就是DefaultChannelPipeline中维护的双向链表,tail为尾部,head为头部。这个链表是 Netty 实现 Pipeline 机制的关键

    小结

     NioSocketChannel初始化时

    • 打开一个新的 Java NIO SocketChannel
    • unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioByteChannel.NioByteUnsafe 内部类
    • 创建pipeline实例
    • readInterestOp变为SelectionKey.OP_READ

    • SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
    • SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())

    EventLoopGroup初始化

    点入new NioEventLoopGroup()默认this(0)参数为0。继续走到MultithreadEventLoopGroup的构造方法

    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

    可以看出来,如果new NioEventLoopGroup(?)不填参数,默认就是以下规则。

    1. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
    2. "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    NettyRuntime.availableProcessors() * 2 = 处理器核心数 * 2

    最后走到MultithreadEventExecutorGroup的MultithreadEventExecutorGroup方法。

    小结

    1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
    2. EventExecutorChooserFactory chooserFactory, Object... args) {
    3. checkPositive(nThreads, "nThreads");
    4. //如果没有自定义执行器(该执行器最终被赋值给EventExecutor的成员变量),则使用ThreadPerTaskExecutor
    5. if (executor == null) {
    6. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    7. }
    8. //实例化children
    9. children = new EventExecutor[nThreads];
    10. //for循环将实例化children中的每一个元素
    11. for (int i = 0; i < nThreads; i ++) {
    12. boolean success = false;
    13. try {
    14. //通过子类中的newChild()来实现
    15. children[i] = newChild(executor, args);
    16. success = true;
    17. } catch (Exception e) {
    18. // TODO: Think about if this is a good exception type
    19. throw new IllegalStateException("failed to create a child event loop", e);
    20. } finally {
    21. if (!success) {
    22. for (int j = 0; j < i; j ++) {
    23. children[j].shutdownGracefully();
    24. }
    25. for (int j = 0; j < i; j ++) {
    26. EventExecutor e = children[j];
    27. try {
    28. while (!e.isTerminated()) {
    29. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    30. }
    31. } catch (InterruptedException interrupted) {
    32. // Let the caller handle the interruption.
    33. Thread.currentThread().interrupt();
    34. break;
    35. }
    36. }
    37. }
    38. }
    39. }
    40. //实例化事件轮询器,即上述的默认的执行器选择工厂DefaultEventExecutorChooserFactory.INSTANCE
    41. chooser = chooserFactory.newChooser(children);
    42. //定义异步事件通知,该通知将被添加到事件执行器EventExecutor上,
    43. //其逻辑也是简单的当children的最后一个元素被成功初始化后设置当前Group的实例化结果
    44. final FutureListener terminationListener = new FutureListener() {
    45. @Override
    46. public void operationComplete(Future future) throws Exception {
    47. if (terminatedChildren.incrementAndGet() == children.length) {
    48. terminationFuture.setSuccess(null);
    49. }
    50. }
    51. };
    52. //将上述通知添加到children中的每一个元素上
    53. for (EventExecutor e: children) {
    54. e.terminationFuture().addListener(terminationListener);
    55. }
    56. //构建一个不可更改的readonlyChildren用于遍历。
    57. Set childrenSet = new LinkedHashSet(children.length);
    58. Collections.addAll(childrenSet, children);
    59. readonlyChildren = Collections.unmodifiableSet(childrenSet);
    60. }
    61. MultithreadEventExecutorGroup 内部维护了一个 EventExecutor 数组, Netty 的 EventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上. 每当 Netty 需要一个 EventLoop 时, 会调用 next() 方法获取一个可用的 EventLoop.

      channel的注册过程

      上边讲过了初始化的过程,如果你认真看了就知道上边留了个坑Ⅰ。channel在Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister这里边调用channelFactory.newChannel()完成初始化。

      initAndRegister代码简化后

      1. final ChannelFuture initAndRegister() {
      2. // 去掉非关键代码
      3. final Channel channel = channelFactory().newChannel();
      4. init(channel);
      5. ChannelFuture regFuture = config().group().register(channel);
      6. return regFuture;
      7. }

      从代码可以看出来初始化后使用register对channel进行了注册,以下是注册主流程

      • next().register(channel),next返回的是一个EventLoop。👇
      • register(new DefaultChannelPromise(channel, this));把channel封装为DefaultChannelPromise以指定excutor和channel。👇
      • promise.channel().unsafe().register(this, promise);调用unsafe的register,还记得unsafe是什么吗?不记得往上边找。👇
      • AbstractChannel.register0(promise);👇
      • doRegister();👇
      • selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);javaChannel() 这个方法返回的是一个 Java NIO SocketChannel, 这里我们将这个 SocketChannel 注册到与 eventLoop 关联的 selector 上了.

      总的来说, Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联, 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. 通过这两步, 就完成了 Netty Channel 的注册过程。

      handler的注册过程

      Netty 的一个强大和灵活之处就是基于 Pipeline 的自定义 handler 机制。

      1. ...
      2. .handler(new ChannelInitializer() {
      3. @Override
      4. public void initChannel(SocketChannel ch) throws Exception {
      5. ChannelPipeline p = ch.pipeline();
      6. if (sslCtx != null) {
      7. p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
      8. }
      9. //p.addLast(new LoggingHandler(LogLevel.INFO));
      10. p.addLast(new EchoClientHandler());
      11. }
      12. });

       handler方法主要是指定了一个handler属性,所以不再细究。我们主要看handler方法的入参ChannelInitializer类。

      ChannelInitializer是抽象类,并且有个抽象方法initChannel,也就是我们需要实现的方法。那么initChannel在哪里调用呢?答案是initChannel(ChannelHandlerContext ctx)中。调用链

      Bootstrap.connect->AbstractBootstrap.initAndRegister->AbstractChannel.register0                       ->DefaultChannelPipeline.invokeHandlerAddedIfNeeded                                                                ->DefaultChannelPipeline.callHandlerAddedForAllHandlers                                                            ->DefaultChannelPipeline.callHandlerAdded0->ChannelInitializer.handlerAdded                            ->ChannelInitializer.initChannel(ChannelHandlerContext ctx)

      1. // 简化
      2. try {
      3. initChannel((C) ctx.channel());
      4. }finally {
      5. ChannelPipeline pipeline = ctx.pipeline();
      6. if (pipeline.context(this) != null) {
      7. pipeline.remove(this);
      8. }
      9. }

       客户端连接

      起始调用链肯定是从

      ChannelFuture f = b.connect(HOST, PORT).sync();

      然后点点点到Bootstrap的doConnect方法

      1. private static void doConnect(
      2. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
      3. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
      4. // the pipeline in its channelRegistered() implementation.
      5. final Channel channel = connectPromise.channel();
      6. channel.eventLoop().execute(new Runnable() {
      7. @Override
      8. public void run() {
      9. if (localAddress == null) {
      10. channel.connect(remoteAddress, connectPromise);
      11. } else {
      12. channel.connect(remoteAddress, localAddress, connectPromise);
      13. }
      14. connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      15. }
      16. });
      17. }

      我们指定的channel是NioSocketChannel(没有实现connect方法),所以调用AbstractChannel

      的connect方法。

      1. public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
      2. return pipeline.connect(remoteAddress, promise);
      3. }

      pipeline是DefaultChannelPipeline,在上边pipeline初始化中讲过。点进去

      1. public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
      2. return tail.connect(remoteAddress, promise);
      3. }

      tail是什么,tail是一个(请转至上边查看继承图)。然后走到了AbstractChannelHandlerContext的connect方法

      1. public ChannelFuture connect(
      2. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
      3. // 精简后
      4. final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
      5. EventExecutor executor = next.executor();
      6. next.invokeConnect(remoteAddress, localAddress, promise);
      7. return promise;
      8. }

      它首先拿到了一个next,next是什么。 是从 DefaultChannelPipeline 内的双向链表的 tail 开始, 不断根据mask向前寻找第一个是 outbound 的 AbstractChannelHandlerContext。更直观一点

       紧接着调用next.invokeConnect但是HeadContext中没实现invokeConnect,所以仍然调用AbstractChannelHandlerContext.invokeConnect方法最后调用HeadContext的connect方法

      1. public void connect(
      2. ChannelHandlerContext ctx,
      3. SocketAddress remoteAddress, SocketAddress localAddress,
      4. ChannelPromise promise) {
      5. unsafe.connect(remoteAddress, localAddress, promise);
      6. }

      unsafe我们已经很熟悉了吧,在HeadContext构造方法中初始化了unsafe,不懂向上看

      1. HeadContext(DefaultChannelPipeline pipeline) {
      2. super(pipeline, null, HEAD_NAME, HeadContext.class);
      3. unsafe = pipeline.channel().unsafe();
      4. setAddComplete();
      5. }

      然后就来到了NioSocketChannel的doConnect方法

      1. protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
      2. // 代码简化
      3. boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
      4. }

       进入SocketUtils.connect后就看到了如何连接的。

       总结

      如果耐心看下来会有必然会有收获。如果哪里不正确,请大佬们指正

      参考自:yongshun/learn_netty_source_code: Netty 源码分析教程 (github.com)

      但是版本是4.0.33.Final 

    62. 相关阅读:
      Java程序员要掌握vue2知识
      移远4G模块调试笔记
      Java练习题-获取数组元素最大值
      CI/CD实战面试宝典:从构建到高可用性的全面解析
      U-App移动统计算力升级!支持跨应用、多事件的打包计算
      FreeRTOS 简单内核实现8 时间片轮询
      Linux账号管理:用户账号与用户组
      y109.第六章 微服务、服务网格及Envoy实战 -- 可观测应用之分布式跟踪(二十)
      使用wireshark分析tcp握手过程
      <能力清单>笔记与思考
    63. 原文地址:https://blog.csdn.net/wai_58934/article/details/127713297