• 2022最新 Netty底层数据交互源码剖析二


    Netty底层数据交互源码剖析

    上一章讲了服务端初始化、注册事件、绑定端口、eventloop线程池的来龙去脉;在上一节的基础上,讲解这一节:如何处理客户端连接事件;以及数据交互,话不多说、直接开车

    Netty服务端事件响应底层实现

    上文我们讲了eventLoop启动之后会有一个线程一直死循环执行调用select方法;那么客户端想要连接的话必然会触发accpet事件,也就是说必然会执行到eventLoop的run方法;所以这一章咱们就从这个方法开始看起!

    • select方法就不用看了,我们假设来了一个客户端连接,必然就会触发accept事件;直接看processKey方法

    接着就是看read方法,一看就是个重点方法!

    可以发现,Netty其实就是在帮我们写Nio的代码而已,我们自己写NIO代码时也会调用这个方法处理连接事件然后生成一个channel,之后客户端的所有读写操作就会在这个channel上进行!

    但是Netty肯定不能就这么了事了;

    可以看到,Netty做了封装,所以封装的逻辑肯定是蛮重要的!点进去看看

    在此之前可以先回想一下,服务端启动的时候也初始化了一个ServerSocketChannel,它是怎么做的呢?

    1. 初始化Channel
    2. 创建pipeline
    3. 设置感兴趣的事件
    4. 将channel注册到selector中

    跟着这个思路我们往下看看源码:

    很好,看到创建pipeline了,逻辑和之前的一样,就是初始化头尾节点双向链表!

    socketChannel就创建完毕了,并将之放到buf列表属性当中返回;

    接着就看fireChannelRead这个方法,根据经验可知,该方法时责任链方法,会调用每个handler中的channelRead方法!

    但是这有个细节,pipeline是谁的pipeline? 网上查看可以发现,这个pipeline其实是Server Socket Channel中的pipeline,所以调用的也就是ServerSocketChannel中的handler

    具体会调用到哪儿呢?就得看看上一章中serversocketchannek中的handler有哪些?

    所以咱们直接看这个handler中的fireChannelRead方法即可

    到这就只有一个疑问:childHandler是啥?

    还记得我们刚开始创建Bootstrap的时候赋的值嘛?

    所以我们完全可以大胆猜测一下,这个方法无非就是完善socketChannel,将handler加入到链表当中取;后续在这个channel完成数据读写的功能

    那就接着往下看

    channel创建好了之后,里面发生事件总得有线程来执行吧,这就轮到workerGroup来处理了,

    目前为止:SocketChannel 设置了一个感兴趣事件:OP_READ,然后初始化了pipeline,一切准备就绪之后,将其注册到一个EventLoop上,也就是workerGroup中得EventLoop

    然后后续得流程就和ServerSocketChannel注册到BossGroup中得EventLoop上是一样得了!在上文有完整的注册流程;

    也即是线程模型图中的第三步

    总结一下

    processSelect Keys将有连接事件的客户端请求调用accpet方法构建一个SocketChannel,然后将其封装成为NIOSocketChanel,这个Channel里面设置了感兴趣事件Read,以及创建并初始化了pipeline; 将pipeline完善后注册到workerGroup中的一个EventLoop上,之后该channel产生的事件就交给该EventLoop去实现了!比如后续客户端向服务端发送消息就会触发Read;

    那么现在就来看看Read事件触发后EventLoop是怎么处理的!逻辑还是在run方法中;

    这时候unsafe代表的就是NioSocketChannel了;接着往下:

    Os将客户端发送的数据拷贝到bytebuf中,接下来就会调用业务读数据了!

    相信看到这里大家都一目了然了,整个Netty核心在心里面都大概有个底了吧!

    以上就是Netty核心数据交互源码,主线流程也就走完了!也就是模型图中的345部分都看过一遍了!说到底Netty就是在帮我们写NIO的代码;关键内容还是他的主从线程模型以及数据交互!

    Netty高并发架构设计

    • 主从Reactor线程模型
    • NIO多路复用非阻塞
    • 无锁串行化设计思想
    • 支持高性能序列化协议
    • 零拷贝(直接内存的使用)
    • ByteBuf内存池设计
    • 灵活的TCP参数配置能力
    • 并发优化

    无锁串行化设计思想

    在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能的避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。NIO的多路复用就是一种无锁串行化的设计思想(理解下Redis和Netty的线程模型)

    为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。

    Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

    DirectMemeory直接内存详解

    也就是堆外内存!

    什么是堆外内存?

    直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也

    会被频繁地使用,而且也可能导致OutOfMemoryError异常出现。Java里用DirectByteBuffer可以分配一块直接内存(堆外内存),元空间

    对应的内存也叫作直接内存,它们对应的都是机器的物理内存。

    为什么要使用堆外内存

    /**
     * 直接内存与堆内存的区别
     */
    public class DirectMemoryTest {
    
        public static void heapAccess() {
            long startTime = System.currentTimeMillis();
            //分配堆内存
            ByteBuffer buffer = ByteBuffer.allocate(1000);
            for (int i = 0; i < 100000; i++) {
                for (int j = 0; j < 200; j++) {
                    buffer.putInt(j);
                }
                buffer.flip();
                for (int j = 0; j < 200; j++) {
                    buffer.getInt();
                }
                buffer.clear();
            }
            long endTime = System.currentTimeMillis();
            System.out.println("堆内存访问:" + (endTime - startTime) + "ms");
        }
    
        public static void directAccess() {
            long startTime = System.currentTimeMillis();
            //分配直接内存
            ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
            for (int i = 0; i < 100000; i++) {
                for (int j = 0; j < 200; j++) {
                    buffer.putInt(j);
                }
                buffer.flip();
                for (int j = 0; j < 200; j++) {
                    buffer.getInt();
                }
                buffer.clear();
            }
            long endTime = System.currentTimeMillis();
            System.out.println("直接内存访问:" + (endTime - startTime) + "ms");
        }
    
        public static void heapAllocate() {
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < 100000; i++) {
                ByteBuffer.allocate(100);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("堆内存申请:" + (endTime - startTime) + "ms");
        }
    
        public static void directAllocate() {
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < 100000; i++) {
                ByteBuffer.allocateDirect(100);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("直接内存申请:" + (endTime - startTime) + "ms");
        }
    
        public static void main(String args[]) {
            for (int i = 0; i < 10; i++) {
                heapAccess();
                directAccess();
            }
    
            System.out.println();
    
            for (int i = 0; i < 10; i++) {
                heapAllocate();
                directAllocate();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    从程序运行结果看出直接内存申请较慢,但访问效率高。在java虚拟机实现上,本地IO一般会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)。

    直接内存源码分析
    public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }
    DirectByteBuffer(int cap) {                   // package-private
        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
        int ps = Bits.pageSize();
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
        //判断是否有足够的直接内存空间分配,可通过-XX:MaxDirectMemorySize=参数指定直接内存最大可分配空间,如果不指定默认为最大堆内存大小,
        //在分配直接内存时如果发现空间不够会显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,同时直接内存也会被释放掉
        //如果释放完分配空间还是不够会抛出异常java.lang.OutOfMemoryError
       Bits.reserveMemory(size, cap);
    
        long base = 0;
        try {
            // 调用unsafe本地方法分配直接内存
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            // 分配失败,释放内存
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        
        // 使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时,
        // 会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;
    }
    
    
    // 申请一块本地内存。内存空间是未初始化的,其内容是无法预期的。
    // 使用freeMemory释放内存,使用reallocateMemory修改内存大小
    public native long allocateMemory(long bytes);
    
    // openjdk8/hotspot/src/share/vm/prims/unsafe.cpp
    UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
      UnsafeWrapper("Unsafe_AllocateMemory");
      size_t sz = (size_t)size;
      if (sz != (julong)size || size < 0) {
        THROW_0(vmSymbols::java_lang_IllegalArgumentException());
      }
      if (sz == 0) {
        return 0;
      }
      sz = round_to(sz, HeapWordSize);
      // 调用os::malloc申请内存,内部使用malloc这个C标准库的函数申请内存
      void* x = os::malloc(sz, mtInternal);
      if (x == NULL) {
        THROW_0(vmSymbols::java_lang_OutOfMemoryError());
      }
      //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
          return addr_to_java(x);
    UNSAFE_END
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    核心方法就是调用malloc方法分配内存,只是系统调用,会触发用户态转内核态!

    使用直接内存的优缺点:

    • 优点:

    不占用堆内存空间,减少了发生GC的可能

    java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内 存=>直接内存=>系统调用=>硬盘/网卡)

    • 缺点:

    初始分配较慢

    没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以 指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间 接把那些没有被使用的直接内存回收掉。

    Netty零拷贝源码

    图:使用直接内存零拷贝与不使用零拷贝的对比

    Netty的接收和发送ByteBuf采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。

    如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket中。JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

    可以看下netty的读写源码,比如read源码NioByteUnsafe.read()

    回到netty读数据的源码:

    ByteBuf内存池设计

    直接内存的缺点是什么?

    • 不方便回收
    • 分配较慢

    所以,一个ByteBuf来之不易,走也不容易;所以干脆将他留下来不就行了;这就是内存池诞生的初衷;既然来自不易,用完之后保存起来不就好了!

    随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer(相当于一个内存块),情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。下面我们一起看下Netty ByteBuf的实现:

    Netty怎么使用内存池的呢?

    那就继续看看分配直接内存的代码

    点进去发现线程缓存获取ByteBuf池化对象,

    ByteBuff扩容机制源码剖析

    扩容机制较为简单:

    从64字节开始,每次两倍递增扩容:64, 128, 256 ······

    当阈值到达4M之后,就不再是成倍扩容了,而是按照步长为4M慢慢扩容,每次加4M;

    • minNewCapacity:表用户需要写入的值大小
    • threshold:阈值,为Bytebuf内部设定容量的最大值
    • maxCapacity:Netty最大能接受的容量大小,一般为int的最大值

    源码解读:进入ByteBuf源码中,深入分析其扩容方法: idea源码进入:ByteBuf.writeByte()->AbstractByteBuf->calculateNewCapacity

    照例来一波总结

    总结

    继续上节的内容讲了当有连接建立时服务端底层做了哪些事情;无非就是生成socketChannel并初始化pipeline设置事件为读事件;最后将其注册到workerGroup中的其中一个EventLoop上,底层流程与上一节ServerSocketChannel一样的流程!

    以及当触发读事件时EventLoop做了哪些工作才保证了数据的正常到达,这里又涉及到了Netty直接内存的概念以及零拷贝原理;

    最后把整个netty的设计精髓做了一些专门的讲解以及源码级别剖析!

    最后贴一张整个Netty源码流程图!

  • 相关阅读:
    STM32 Cortex-M4 RTC实时时钟保姆级别总结
    谈谈我的「数字文具盒」 - NextCloud
    字节技术面都过了,薪资都谈好了20K*13结果还是被刷了,问HR原因是。。。
    LeetCode 刷题记录——从零开始记录自己一些不会的(二)
    网络模型与细节思维方式
    String字符串性能优化的几种方案
    MyBatis insert标签
    仓库管理流程详解(附作业流程图)
    java基于Android校园闲置资料共享系统uniapp小程序
    G. Good Key, Bad Key(暴力)
  • 原文地址:https://blog.csdn.net/m0_52255061/article/details/126411248