• 【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?


    • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
    • 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
    • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、操作系统从入门到成神
    • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
    • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
    • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

    Netty 源码

    一、源码构成

    在这里插入图片描述

    二、Netty对三种I/O的支持

    在这里插入图片描述
    首先,我们看下这几种 I/O 的实现。

    BIO(阻塞IO):食堂排队打饭模式,持队在窗口,打好才走

    NIO(非阻塞IO):点单、等待被叫模式,等待被叫,好了自己去端

    AIO(异步IO):包厢模式,点单后菜直接被端上桌。

    BIO的实现:

    // 如果是BIO的话
    EventLoopGroup bossGroup = new OioEventLoopGroup();
    EventLoopGroup workerGroup = new OioEventLoopGroup();
    b.group(bossGroup, workerGroup).channel(OioServerSocketChannel.class)
    
    • 1
    • 2
    • 3
    • 4

    其余实现也如上所示,我们 Linux 下的环境基本都是 NIO 的 I/O 实现

    1、三种模式如何切换

    我们可以看到,上面我们 Netty 实际支持三种 I/O 模式,那么这三种 I/O 是怎么样进行的切换呢?

    方法的执行是在:

    b.group(bossGroup, workerGroup)
    // 主要在下面这一行!!!
    .channel(NioServerSocketChannel.class)
    
    • 1
    • 2
    • 3

    我们观察一下,.channel 方法做了什么

    public B channel(Class<? extends C> channelClass) {
        	  // 反射工厂的实现
            return channelFactory(new ReflectiveChannelFactory<C>(  
                    ObjectUtil.checkNotNull(channelClass, "channelClass")
            ));
        }
    
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
            // 校验当前传入的clazz是否为空
            ObjectUtil.checkNotNull(clazz, "clazz");
            try {
                // 通过反射获取对应类的无参构造器
                this.constructor = clazz.getConstructor();
            }
        }
    // 上面获取完无参构造器之后,这里会通过无参构造器调用 newInstance 得到一个类的实例
    public T newChannel() {
        try {
            //反射创建channel
            return constructor.newInstance();
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    简单描述一下,通过 泛型+反射+工厂 实现 IO 模式切换

    三、Netty 如何支持 Reactor 模型

    Reactor 模型通过注册监听事件的方式,将阻塞模式修改为非阻塞模式,大大的提升了效率。

    在这里插入图片描述
    在这里插入图片描述

    真正去执行的是 EventLoop,而我们的 EventLoopGroup 则是多个 EventLoop 的集合

    1. EventLoop 是线程嘛

    首先,我们看一下 EventLoop 的继承关系:

    在这里插入图片描述

    可以明显的看出,我们的 EventLoop 最终继承 ScheduledExecutorService 方法,也就是我们经常经常使用的线程池中的定时线程。

    2. 如何实现三种模式的 Reactor 模型

    我们知道,一般 Reactor 模型,有三种:

    2.1 单线程 Reactor 模式

    在这里插入图片描述
    EventLoopGroup bossGroup = new NioEventLoopGroup(1); //有参的构造方法,创建1个线程

    使用上述代码即可创建单线程的 Reactor 模型。

    但具体如何实现,我们去看下源码:

    // nThreads 传递的入参
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
    • 1
    • 2
    • 3
    • 4

    在源码中明确表明了,如果当前你传入的入参为 **0(不传入参)**时,当前线程为 DEFAULT_EVENT_LOOP_THREADS,如果你传递了入参,则按照你入参的个数申请线程。

    PS:

    DEFAULT_EVENT_LOOP_THREADS 的大小在源码中定义如下:

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
    • 1
    • 2

    这里的 NettyRuntime.availableProcessors() 最终调用的方法为:Runtime.getRuntime().availableProcessors())

    有兴趣的读者可以自己去看下自己电脑的线程,博主的线程如下:

    • IDEA的线程
      在这里插入图片描述
    • 鲁大师的面板
      在这里插入图片描述

    2.2 多线程 Reactor 模式

    在这里插入图片描述
    如果我们传递无入参的话,创建的就是多线程的 Reactor 模式

    这种相对于单线程来说,其实提升并不是很大,简单而言就是多个线程的堆积。

    比如

    • 原来一个线程完成接受、读取、解码、计算、加码、发送整个流程
    • 现在多个线程一起完成接受、读取、解码、计算、加码、发送整个流程

    相当于复制了几份而已

    2.3 主从 Reactor 模式

    在这里插入图片描述
    而我们的主从 Reactor 模式则打破了原有的架构,采用了一个新的架构进行数据的接受和发送

    我们创建两个线程组,一个线程组负责接受客户端的消息,另外一个线程组负责读取、解码、计算、加码、发送整个流程。

    这样,我们每个线程都有自己要做的事情并且由于接受客户端的消息很快,我们的 mainReactor 线程组会比原来接受更多的客户端消息

    2.3.1 主从模式如何实现

    Netty 中只需要指定两个 EventLoopGroup 即可,如下:

    EventLoopGroup bossGroup = new NioEventLoopGroup();  //无参的构造方法: 线程组,多个线程
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    
    • 1
    • 2
    • 3
    • 4

    我们的 bossGroup 就是负责接受数据的线程组,而我们的 workerGroup 就是负责处理数据的线程组

    2.3.2 主从模式源码实现
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // 这里会设置父线程组(接受数据的)
        super.group(parentGroup);
        ObjectUtil.checkNotNull(childGroup, "childGroup");
        this.childGroup = childGroup;
        return this;
    }
    // 类的全路径:io.netty.bootstrap.AbstractBootstrap
    volatile EventLoopGroup group;
    public B group(EventLoopGroup group) {
        ObjectUtil.checkNotNull(group, "group");
        this.group = group;
        return self();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    经过上述操作,我们传入的 bossGroup 已经被赋值到 io.netty.bootstrap.AbstractBootstrap.group 了。

    我们看上述的图片,可以看出来,这个 bossGroup 主要做的是接受客户端的数据连接,但口说无凭,源码中在哪实现了呢

    主要在于 bind() 这个方法:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//这个方法比较重要
    }
    final ChannelFuture initAndRegister() {
        // 在这里的config().group()拿出我们的 EventLoopGroup 进行注册
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    而这里的 EventLoopGroup 我们可以看到,他正是我们之前设置的 bossGroup,也就是 io.netty.bootstrap.AbstractBootstrap.group 这个变量

    另外,我们的从模式解决读事件的方法,可以直接去 io.netty.bootstrap.ServerBootstrap.childGroup 这个变量去找

    3. Netty 给 Channel 分配 NIOEventLoop 的规则

    我们上面可以看到,一个 EventLoopGroup 包括多个 EventLoop,那么我们处理数据,Netty 是如何分配这些 EventLoop 的呢?

    在源码 io.netty.util.concurrent.MultithreadEventExecutorGroup.next() 方法中

    @Override
    public EventExecutor next() {
        return chooser.next(); //chooser是一个选择器,一般的实现会使用策略模式
    }
    
    • 1
    • 2
    • 3
    • 4

    利用 chooser 选择器,选择不同的策略执行不同的 EventLoop

    // 选择器的选择
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //根据待绑定的executor是否是2的幂次方,做出不同的选择
        // 这里的二次方后面会讲到
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    我们点进去发现,这里的策略一共有两种

    • GenericEventExecutorChooser:取模轮询
    • PowerOfTwoEventExecutorChooser:幂等运算(数组长度必须是2的幂次方

    3.1 GenericEventExecutorChooser

    这里其实没什么好说的,就简单的取模、

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        // 原子递增
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;
    
        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
    
        @Override
        public EventExecutor next() {
            //递增、取模,取正值,不然可能是负数,另外:有个非常小的缺点,当idx累加成最大值后,有短暂的不公平:
            //1,2,3,4,5,6,7,0,7 7 7 
            // 当我们的 idx.getAndIncrement() 到达最大值 Integer.MAX_VALUE 时,会不再增加,也就一直是
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3.2 PowerOfTwoEventExecutorChooser

    这里利用的是 & 的知识点

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;
    
        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
    
        @Override
        //executors总数必须是2的幂次方(2,4,8...等)才会用,&运算效率更高,同时当idx累加成最大值之后,相比较通用的方式(GenericEventExecutorChooser),更公平
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    读者如果阅读过 HashMap 的源码,在 put 的过程中,采用了 & 的方式去进行数组的确定。

    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }
    
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0){
            n = (tab = resize()).length;
        }
        if ((p = tab[i = (n - 1) & hash]) == null){
            tab[i] = newNode(hash, key, value, null)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    其中,tab[i = (n - 1) & hash] 利用 (n - 1) & hash 将当前的数组均匀的分配到每个数组上,其实效果和我们上面的取模是一样的。

    主要是性能上的提高

    • 对于取模运算来说,我们计算机需要去进行不断的取余计算,如果数据过大,其实会耗费一些性能。
    • 但对于 & 运算,直接操作二进制,这种效率较高

    这里就不介绍为什么 (n - 1) & hash 等价于 hash % n 了,有兴趣的读者可以翻一下以前介绍 HashMap 的文章

    4. Netty 如何保证跨平台性

    我们知道,对于不同的平台,Netty 具有不同的实现,如下:
    在这里插入图片描述

    那这种是如何实现的呢,毕竟我们使用的都是同一套代码

    当我们创建 EventLoopGroup 时,我们会有这么一个实现:

    public NioEventLoopGroup(int nThreads, Executor executor) {
        //默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java
        //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里面的 SelectorProvider.provider() 就是我们所选择的实现

    • 最终的实现是在我们下载的JDK里面,所以不同平台的JDK默认有不同的实现
    public static SelectorProvider provider() {
        // 防止多个线程并发访问
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                        	 // 这里是通过SPI去查询文件是否配置
                           // String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
                            if (loadProviderFromProperty())
                                return provider;
                        	// META-INF/services
                        // ServiceLoader sl =
                ServiceLoader.load(SelectorProvider.class,
                                   ClassLoader.getSystemClassLoader());
                            if (loadProviderAsService())
                                return provider;
                        	// 默认会用下面这个
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
    // 类的路径:java.nio.channels.spi.SelectorProvider.DefaultSelectorProvider
    public static SelectorProvider create() {
        return new WindowsSelectorProvider();
    }
    // 博主安装的是 Window 的JDK版本,如果是MAC的,下面的将会是这个样子
    public static SelectorProvider create() {
        return new sun.nio.ch.KQueueSelectorProvider();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    四、Netty 对粘包和半包的支持

    1. 什么是粘包、半包

    在这里插入图片描述
    我们的客户端向服务端发送 ABC DED,但我们的服务端收到的消息却不一样,具体情况如下:

    • 没有问题的情况下:服务端收到 ABC DEF
    • 一次收到了:服务端收到 ABCDEF (TCP粘包)
    • 多次收到:第一次 AB,第二次 CD,第三次 EF(TCP拆包)

    我们这里实际测试一下,有的读者可能还是不太清楚:

    首先,我们的服务端核心代码如下:

    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        /*** 服务端读取到网络数据后的处理*/
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf)msg;
            String request = in.toString(CharsetUtil.UTF_8);
            System.out.println("Server Accept["+request
                    +"] and the counter is:"+counter.incrementAndGet());
            String resp = "Hello,"+request+". Welcome to Netty World!"
                    + System.getProperty("line.separator");
            ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    当服务端收到客户端的消息时,会执行 channelRead0 这个方法,我使用了一个 counter 原子变量来记录服务端当前收到的消息数量。

    客户端核心代码:

    /**
     * 类说明:粘包/半包问题展示
     */
    public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        /*** 客户端读取到网络数据后的处理*/
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                    +"] and the counter is:"+counter.incrementAndGet());
        }
    
        /*** 客户端被通知channel活跃后,做事*/
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf msg = null;
            String request = "ABC,DEF,GHI,JKL,MNO"
                    + System.getProperty("line.separator");
            //发送100次
            for(int i=0;i<100;i++){
                msg = Unpooled.buffer(request.length());
                msg.writeBytes(request.getBytes());
                ctx.writeAndFlush(msg);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    我们可以看到,我们的客户端会发送 100ABC,DEF,GHI,JKL,MNO 的字符串

    正常来说,我们的服务端会收到 100 条消息,我们运行一下看看实际情况

    服务端的截图:

    • 收到的第一条消息:
      在这里插入图片描述
    • 收到的第二条消息
      在这里插入图片描述

    这正是出现的粘包、半包问题

    2. 为什么会出现这个问题?

    当我们的客户端向服务端发送数据时,并不是立即发送

    主要在于:socket会有缓存区,当缓存区达到一定的条件时,才会进行发送至服务端。

    当然,本质还是在与:对于TCP来说,TCP是一个流式的协议,消息无边界

    3. 如何避免粘包、半包

    既然出现问题的原因在于消息无边界,那么只要我们找到当前客户端发送消息的边界就可以了。
    在这里插入图片描述

    3.1 短连接(不推荐)

    当我们的客户端和服务端建立连接后,客户端发送一次消息就立即断开,这样我们的消息仅仅只发一次,服务端也只会收到一次。

    这种一般不太实用,总不能我客户端每一次都要和服务端重新建立连接吧。

    3.2 固定长度(不推荐)

    我们客户端和服务端发送消息规定一个长度,比如我现在想发送 ABCDEF 100 次,那么我规定的长度就是 6,一次6个字符为一次调用。

    Netty 对长度的支持为:FixedLengthFrameDecoder,自动为你分割服务端当前收到的消息

    但这种比较浪费空间,比如你当前的长度设置成 100,但最后一次消息分割只有 10,浪费掉了 90 的空间。

    3.3 分隔符(推荐)

    我们客户端和服务端发送消息规定一个换行符,比如我现在想发送 ABCDEF 100 次,那么我们每一次发送后面都加一个 换行符

    Netty 对换行符的支持为:LineBasedFrameDecoder,自动为你分割服务端当前收到的消息。

    当然,如果你不想局限于 换行符,也可以使用自定义的:DelimiterBasedFrameDecoder

    但这种的缺点在于有的分隔符需要进行转义,代码写起来较为复杂。

    3.4 消息头和消息体(推荐)

    参考我们的 HTTP 请求
    分别以下几方面:

    • 魔数:4 个字节的魔数
    • 版本:当前的版本号
    • 序列化方式:JDK或者JSON
    • 指令类型:
    • 请求序号:唯一性
    • 对齐填充
    • 获取内容的字节数组
    • 正文长度
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
            // 1. 4 个字节的魔数
            out.writeBytes(new byte[]{1, 2, 3, 4});
            // 2. 1 字节的版本
            out.writeByte(1);
            // 3. 1 字节的序列化方式:0 JDK  1 JSON
            out.writeByte(0);
            // 4. 1 指令类型
            out.writeByte(msg.getMessageType());
            // 5. 4 请求序号
            out.writeInt(msg.sequenceID);
            // 对齐充填充
            out.writeByte(0xff);
            // 6. 获取内容的字节数组
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(msg);
            byte[] bytes = bos.toByteArray();
            // 7. 正文长度
            int len = bytes.length;
            out.writeInt(len);
            // 8. 写入内容
            out.writeBytes(bytes);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    打败 二叉树!
    通过 saltstack 批量更新 SSL 证书
    core dump(介绍,status中的core dump标志,应用--调试),ulimit命令
    图文手把手教程--ESP32 MQTT连接腾讯云物联网平台及OTA固件升级
    Mysql数据库基础和增删改查操作
    设置ZIP文件打开密码的两种方法
    信奥中的数学:质数与合数
    添加IDEA到右键打开里面
    PyTorch深度学习实践1——线性回归和Logistic回归
    基于单片机的指纹密码锁系统
  • 原文地址:https://blog.csdn.net/qq_40915439/article/details/126494466