• 【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)


    • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
    • 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
    • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、操作系统从入门到成神
    • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
    • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
    • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

    Netty源码

    相信学过 Netty 的小伙伴都应该熟悉 JavaNIO,在 Java 中创建服务端和客户端的代码如下所示:

    服务端

    // 1. 创建一个 selector 对象
    Selector selector = Selector.open();
    
    ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建FD-1
    ssc.configureBlocking(false); // 非阻塞模式
    
    // 2. 建立 selector 与 channel 的联系(注册)
    // SelectionKey:事件发生时,通过这个可以知道事件和那个channel的事件
    // 这个key,只关注 accept 事件
    SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
    
    // 3. 注册端口号
    ssc.bind(new InetSocketAddress(8080));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    客户端

    // 建立客户端的channel
    SocketChannel channel = SocketChannel.open();
    // 连接服务端的IP和端口
    channel.connect(new InetSocketAddress("localhost", 8080));
    // 发送消息
    channel.write(Charset.defaultCharset().encode("hello"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    我们的 Netty 正是在 Java NIO 做的一层封装

    既然是封装,Netty 的源码中必然存在以上 服务端客户端 的代码

    一、Netty 服务端

    1. Netty 服务端启动代码

    public class TestNettyServer {
        public static void main(String[] args) {
            // 1. 服务器端的启动器,负责组装 netty 主键,启动服务器
            new ServerBootstrap()
                    // 2. BossEventLoop、WorkerEventLoop(selector、thread)
                    .group(new NioEventLoopGroup())
                    // 3. 选择服务器的IO模式
                    .channel(NioServerSocketChannel.class)
                    // 4. boss 负责处理连接 worker 负责处理读写
                    .childHandler(
                            // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                            new ChannelInitializer<NioSocketChannel>() {
                                @Override
                                protected void initChannel(NioSocketChannel ch) throws Exception {
                                    // 6. 添加具体的 handler
                                    ch.pipeline().addLast(new LoggingHandler());
                                }
                            })
                    // 7. 绑定监听端口
                    .bind(8080);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • Netty 中获取选择器 SelectorNetty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器

    • 创建NioServerSocketChannel,该 Channel作为附件添加到 ServerSocketChannel

      // 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
      NioServerSocketChannel attachment = new NioServerSocketChannel();
      //注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
      SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
      
      • 1
      • 2
      • 3
      • 4
    • 绑定端口

    要剖析启动代码,我们直接从 bind 入手

    2. Bind

    选择器 Selector 的创建是在 NioEventloopGroup 中完成的。

    NioServerSocketChannelServerSocketChannel 的创建,ServerSocketChannel 注册到 Selector 上以及绑定的操作都由 bind 完成。

    所以,我们的启动入口:io.netty.bootstrap.AbstractBootstrap.bind

    public ChannelFuture bind(SocketAddress localAddress) {
            this.validate();
            return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    • 1
    • 2
    • 3
    • 4

    3. doBind

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 负责NioServerSocketChannel和ServerSocketChannel的创建
        // ServerSocketChannel的注册工作
        // init由main线程完成,regisetr由NIO线程完成
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        // 因为register操作是异步的
        // 所以要判断主线程执行到这里时,register操作是否已经执行完毕
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            
            // 执行doBind0绑定操作
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            // 如果register操作还没执行完,就会到这个分支中来
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            
            // 添加监听器,NIO线程异步进行doBind0操作
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    						 // 执行doBind0绑定操作
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    3.1 init

    Channel channel = null;
    try {
        // 通过 channelFactory 创建 NIOServerSocketChannel
        channel = this.channelFactory.newChannel();
        this.init(channel);
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
        return (new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3.1.1 创建 NIOServerSocketChannel

    我们看一下通过这个 channelFactory 怎么创建的 NIOServerSocketChannel,并如何实现 ServerSocketChannel.open()

    public T newChannel() {
        try {
            return (Channel)this.constructor.newInstance();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    很明显,利用的反射的原理创建的 Channel

    通过 DEBUG 我们可以得到当前 Channel 的地址:io.netty.channel.socket.nio.NioServerSocketChannel

    我们追进去看看,构造方法:

    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    // 构造方法
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
        return provider.openServerSocketChannel();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    所以,最终调用的是:SelectorProvider.provider().openServerSocketChannel()

    而这个调用,正好和我们 ServerSocketChannel ssc = ServerSocketChannel.open(); 的调用链路一模一样,我用图展示一下:

    在这里插入图片描述

    通过上图,我们可以明显的看到,Netty 在初始化的时候,实际上调用 NioServerSocketChannel 的构造方法,通过其实现了 ServerSocketChannel.open(),新建 FD

    到这里,我们的初始化 NioServerSocketChannel 告一段落,下面看一下 Register 操作

    3.1.2 添加 NIOServerSocketChannel 初始化 Handler

    我们继续往下追,可以看到有一个 this.init(channel); 方法,这个方法添加了一个 Handler

    p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = ServerBootstrap.this.config.handler();
            if (handler != null) {
                pipeline.addLast(new ChannelHandler[]{handler});
            }
    
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                }
            });
        }
    }});
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    我们暂时不解释这里的作用,我们只需要记住这里添加了一个 Handler,后面会进行调用(后面调用的时候会讲)。

    3.2 Register

    我们先总览一下 Register 的源码部分:

    ChannelFuture regFuture = this.config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    	 // 获取当前的eventLoop
        AbstractChannel.this.eventLoop = eventLoop;
    	 // 此处完成了由 主线程 到 NIO线程 的切换
        // eventLoop.inEventLoop()用于判断当前线程是否为NIO线程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            // 向NIO线程中添加任务
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 该方法中会执行doRegister
                    // 执行真正的注册操作
                    register0(promise);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    3.2.1 register0
    private void register0(ChannelPromise promise) {
        try {
            // 执行真正的注册逻辑
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // 调用init中的initChannel方法
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们的注册整体上主要分为 2 部分

    • 执行真正的注册逻辑,也就是:ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
    • 让我们之前写的 Handler 执行

    doRegister

    • eventLoop().unwrappedSelector():我们的 Selector 存储的位置
    • this:将当前的 NIOServerSocketChannel 作为附件,便于之后的获取
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 注册
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    pipeline.invokeHandlerAddedIfNeeded:此方法会调用我们一开始定义的 Handler

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
    		  // 添加新任务,任务负责添加 handler
            // 该handler负责发生Accepet事件后建立连接
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Register 主要实现了 ssc.register(selector, SelectionKey.OP_ACCEPT, attach); 的注册功能,并在创建时进行了线程切换,从 main线程NIO线程

    到这里,我们的 Register 也告一段落

    3.3 doBind0

    我们上面讲到了 safeSetSuccess(promise) ,向我们的 promise 设置成功的结果,并通过下面的 doBind0(regFuture, channel, localAddress, promise); 进行调用

    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {.
                    promise.setFailure(cause);
                } else {
                    promise.registered();
    
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    底层实现如下:

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
        boolean wasActive = isActive();
        try {
            // 注册端口
            doBind(localAddress);
        } 
        // 前面一系列操作后,我们的 NIOServerSocketChannel 是否可用
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
        
        safeSetSuccess(promise);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    注册端口的代码

    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        // 根据当前 JDK 的版本是否大于 7
        if (PlatformDependent.javaVersion() >= 7) {
            // 调用ServerSocketChannel的bind方法,绑定端口
            // javaChannel() = ServerSocketChannel
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    到这里,我们已经完成了端口的注册,距离我们服务器的启动只差绑定 SelectionKey.OP_ACCEPT

    我们后续看一下:pipeline.fireChannelActive();

    pipeline.fireChannelActive:触发所有该 pipeline 上的事件

    我们 NettyHandler 信息如下:
    在这里插入图片描述

    除了我们自定义的 Handler,我们需要查看一下 Head 的代码

    最终实现代码如下:

    @Override
    protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        // readInterestOp = 1 << 4 = 16
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    我们对比着 SelectionKey 的描述代码看一下:

    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;
    
    • 1
    • 2
    • 3
    • 4

    可以看到,上述代码添加的事件正是 OP_ACCEPT 事件。

    至此,我们的 Netty 服务端就正式启动了

    4. 总结

    Netty 服务端的启动框架基本封装了 Java NIO 启动的部分源码。

    剩余的源码将在以后的系列中持续更新,喜欢的小伙伴不妨点个关注~

  • 相关阅读:
    我的pip python库-20220815
    linux 安装 wordpress
    【css】iconfont的使用
    UNIX环境高级编程-第二章
    Python之哈希表-遍历和有序性
    建立JDBC连接
    iEnglish马铁鹰:智能教育塑造未来学习新范式
    手写RPC框架--6.封装报文
    MLOps:掌握机器学习部署:Docker、Kubernetes、Helm 现代 Web 框架
    .NET 6学习笔记(1)——通过FileStream实现不同进程对单一文件的同时读写
  • 原文地址:https://blog.csdn.net/qq_40915439/article/details/125493995