• Netty深入浅出(无处不在的IO)


    为什么要有Netty

    Netty是为了解决网络编程的复杂性和提供易于使用、高性能和可扩展的框架而开发的。它通过提供一组可重用的组件来处理网络通信的低级细节,例如套接字管理、线程和缓冲,简化了开发网络应用程序的过程。这使开发人员可以专注于应用程序逻辑而不是网络编程的复杂性。此外,Netty支持各种协议和传输机制,使其成为构建各种网络应用程序的多功能选择。

    Java中的IO模型

    Netty是一个Java编写的网络IO库,Netty在其底层仍然使用Java I/O库,如java.nio包。它使用了Java NIO(New I/O)的一些特性,例如非阻塞通道(Channel)、选择器(Selector)等,以实现高性能的网络通信

    三种通信模型

    • BIO (Blocking I/O): 同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。

    • NIO (New I/O): NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架。NIO提供了与传统BIO模型中的 SocketServerSocket 相对应的 SocketChannelServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。

    • AIO (Asynchronous I/O): AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO 是异步IO的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO操作本身是同步的。

    NIO

    示例代码
    • 服务端
    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SelectionKey;
    5. import java.nio.channels.Selector;
    6. import java.nio.channels.ServerSocketChannel;
    7. import java.nio.channels.SocketChannel;
    8. import java.util.Iterator;
    9. import java.util.Set;
    10. public class NIOServer {
    11. public static void main(String[] args) throws IOException {
    12. // 创建ServerSocketChannel
    13. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    14. serverSocketChannel.socket().bind(new InetSocketAddress(8080));
    15. serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式
    16. Selector selector = Selector.open();
    17. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    18. System.out.println("Server is listening on port 8080...");
    19. while (true) {
    20. int readyChannels = selector.select();
    21. if (readyChannels == 0) {
    22. continue;
    23. }
    24. Set selectedKeys = selector.selectedKeys();
    25. Iterator keyIterator = selectedKeys.iterator();
    26. while (keyIterator.hasNext()) {
    27. SelectionKey key = keyIterator.next();
    28. if (key.isAcceptable()) {
    29. SocketChannel clientChannel = serverSocketChannel.accept();
    30. clientChannel.configureBlocking(false);
    31. clientChannel.register(selector, SelectionKey.OP_READ);
    32. System.out.println("Accepted connection from " + clientChannel.getRemoteAddress());
    33. } else if (key.isReadable()) {
    34. SocketChannel clientChannel = (SocketChannel) key.channel();
    35. ByteBuffer buffer = ByteBuffer.allocate(1024);
    36. int bytesRead = clientChannel.read(buffer);
    37. if (bytesRead == -1) {
    38. clientChannel.close();
    39. System.out.println("Client disconnected.");
    40. } else if (bytesRead > 0) {
    41. buffer.flip();
    42. while (buffer.hasRemaining()) {
    43. clientChannel.write(buffer); // 回显客户端发送的数据
    44. }
    45. buffer.clear();
    46. }
    47. }
    48. keyIterator.remove();
    49. }
    50. }
    51. }
    52. }
    • 客户端
    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SocketChannel;
    5. public class NIOClient {
    6. public static void main(String[] args) throws IOException {
    7. SocketChannel socketChannel = SocketChannel.open();
    8. socketChannel.connect(new InetSocketAddress("localhost", 8080));
    9. String message = "Hello, NIO Server!";
    10. ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
    11. socketChannel.write(buffer); // 发送消息给服务器
    12. ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
    13. int bytesRead = socketChannel.read(responseBuffer); // 读取服务器的响应
    14. if (bytesRead != -1) {
    15. responseBuffer.flip();
    16. byte[] bytes = new byte[responseBuffer.remaining()];
    17. responseBuffer.get(bytes);
    18. String response = new String(bytes);
    19. System.out.println("Received from server: " + response);
    20. }
    21. socketChannel.close(); // 关闭客户端连接
    22. }
    23. }

    AIO

    示例代码
    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.AsynchronousChannelGroup;
    5. import java.nio.channels.AsynchronousServerSocketChannel;
    6. import java.nio.channels.AsynchronousSocketChannel;
    7. import java.nio.channels.CompletionHandler;
    8. import java.util.concurrent.Executors;
    9. public class AIOTimeServer {
    10. public static void main(String[] args) throws IOException {
    11. int port = 8080;
    12. AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(10));
    13. final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(group);
    14. serverChannel.bind(new InetSocketAddress(port));
    15. System.out.println("Server is listening on port " + port);
    16. serverChannel.accept(null, new CompletionHandler() {
    17. @Override
    18. public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
    19. serverChannel.accept(null, this); // 接受下一个连接
    20. String response = "Current time: " + System.currentTimeMillis();
    21. ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
    22. clientChannel.write(buffer, null, new CompletionHandler() {
    23. @Override
    24. public void completed(Integer result, Void attachment) {
    25. try {
    26. clientChannel.close();
    27. } catch (IOException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. @Override
    32. public void failed(Throwable exc, Void attachment) {
    33. exc.printStackTrace();
    34. try {
    35. clientChannel.close();
    36. } catch (IOException e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. });
    41. }
    42. @Override
    43. public void failed(Throwable exc, Void attachment) {
    44. exc.printStackTrace();
    45. }
    46. });
    47. try {
    48. group.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.SECONDS);
    49. } catch (InterruptedException e) {
    50. e.printStackTrace();
    51. }
    52. }
    53. }

    为什么Netty依旧使用NIO的API?

    Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 的底层实现仍使用 EPOLL(后续会讲),没有很好实现 AIO,因此在性能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化。

    Linux的IO模型

    Java的I/O模型是在Java编程语言层面的抽象,而Linux的I/O模型是操作系统内核层面的实现。因此,虽然它们有一些相似之处,但并不是完全相同的概念。

    LINUX五种IO模型

    同步和异步:同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO 操作并等待或者轮询的去查看IO 操作是否就绪,而异步是指用户进程触发IO 操作以后便开始做自己的事情,而当IO 操作已经完成的时候会得到IO 完成的通知。

    阻塞和非阻塞:阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作方法的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入方法会立即返回一个状态值。

    1. 阻塞式IO

    2. 非阻塞式IO

    3. IO多路复用

    4. 信号驱动

    5. 异步IO

    前面四种IO模型实际上都属于同步IO,只有最后一种是真正的异步IO,因为无论是多路复用IO还是信号驱动模型,IO操作的第2个阶段都会引起用户线程阻塞,也就是内核进行数据拷贝的过程都会让用户线程阻塞。

    SELECT/POLL/EPOLL

    selectpollepoll都是多路复用的实现机制,它们用于管理多个I/O通道的并发事件。这些机制在操作系统中允许应用程序监视多个文件描述符,以确定哪些文件描述符已准备好进行I/O操作,从而避免了频繁的轮询。

    • select

      它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。内核需要将消息传递到用户空间,都需要内核拷贝。

    • poll

      poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。

    • epoll基于操作系统支持的I/O通知机制

      epoll通过内核和用户空间共享一块内存来实现的。

            1.水平触发(LT):基于管道轮询的机制。管道中有100k的数据,只读了60k,中断了之后,还是能收到这个管道有数据的通知的。后续还是能轮询到这个管道有数据。

            2.边缘触发(ET):基于管道状态改变的机制。只处理状改变的管道。管道中有100k的数据,只读了60k,中断了,是读不了剩下的40k。只有新数据进入这个管道时,这个管道状态再次变成“有数据”的状态,才会发送通知。

    epoll 工作在 ET 模式的时候,必须使用非阻塞套接口,以避免由于一个文件描述符的阻塞读/写操作把处理多个文件描述符的任务饿死。

    epoll_create 开辟空间获得文件描述符

    epoll_ctl 添加socket文件描述符到空间内

    epoll_wait 获取有事件的socket

    用户态与内核态

    I/O(Input/Output,输入/输出)和用户态与内核态之间存在密切的关系,特别是在操作系统中。用户态和内核态是操作系统中的两个不同特权级别,它们用于管理和保护计算机系统的资源。以下是关于I/O、用户态和内核态之间的关系的重要信息:

    1. I/O操作涉及用户态和内核态

      • I/O操作包括从应用程序到外部设备(如磁盘、网络、键盘、显示器等)的数据传输。这些操作通常涉及到用户态和内核态之间的切换。
      • 当应用程序需要执行I/O操作时,它会调用操作系统提供的I/O系统调用(例如,读取文件或发送数据包)。这些系统调用是在用户态执行的。
      • 操作系统内核负责管理系统资源和I/O设备,因此在执行I/O操作时,控制必须从用户态切换到内核态,以便内核可以直接访问硬件资源。
    2. 用户态和内核态的切换

      • 用户态和内核态是不同的CPU执行模式。在用户态下,应用程序只能访问有限的资源,而在内核态下,操作系统内核可以访问系统的全部资源。
      • 当应用程序需要执行需要特权访问的操作,例如执行系统调用或访问设备驱动程序,它必须通过软中断或异常将控制权切换到内核态。这个切换是由操作系统的内核来处理的。
      • 用户态到内核态的切换会涉及一些开销,因为需要保存和恢复CPU寄存器、切换堆栈等操作。因此,频繁的切换会影响性能。
    3. 内核态的I/O处理

      • 一旦控制切换到内核态,操作系统内核就可以执行I/O操作。它会管理设备驱动程序、缓冲区、中断处理等细节,以确保数据的正确传输。
      • 内核还会维护I/O请求队列,以有效地处理多个I/O请求。
    4. 异步I/O和用户态I/O

      • 异步I/O是一种I/O模型,允许应用程序继续执行其他任务,而不必等待I/O操作完成。在这种情况下,通常使用异步I/O库来管理I/O操作,而不需要频繁地切换用户态和内核态。
      • 异步I/O通常通过回调函数或事件通知机制来处理I/O完成事件。

    I/O操作涉及用户态和内核态之间的切换,因为操作系统内核必须管理和控制I/O设备。这个切换是操作系统的核心功能之一,用于确保计算机系统的稳定性、安全性和性能。不同的I/O模型可以影响用户态和内核态之间的切换方式和频率。说到内核态切换,下面不得不介绍的就是零拷贝。

    Zero Copy(零拷贝)

    Netty与零拷贝(Zero-Copy)之间有密切的关系,因为Netty是一个网络应用框架,专门设计用于高性能的网络通信,而零拷贝是一项技术,可以用于提高数据传输的效率,特别是在网络通信中。

    以下是Netty与零拷贝的关系和如何在Netty中利用零拷贝技术的一些重要信息:

    Netty的高性能特性:Netty被设计为高性能的网络应用框架,它旨在处理大量并发连接和高吞吐量的网络通信。为了实现这一目标,Netty采用了多种性能优化技术,其中之一就是零拷贝。

    零拷贝是一种优化技术,旨在减少数据在内存之间的复制次数。传统的数据传输通常涉及将数据从一个缓冲区复制到另一个缓冲区,这会引入额外的CPU和内存开销。零拷贝技术通过操作系统或硬件支持,允许数据在不复制的情况下从一个地方传输到另一个地方,从而提高了数据传输的效率。
       
    `ByteBuf`是Netty的自定义缓冲区类型,它支持零拷贝和引用计数等特性。在Netty中,`ByteBuf`可以在数据传输时直接暴露底层数据,而不需要进行数据复制,从而减少了CPU和内存开销。

    零拷贝的实现

    传统IO

    read:将数据从磁盘通过DMA读取到内核缓存区中,在拷贝到用户缓冲区

    write: 先将数据写入到socket缓冲区中,经过DMA写入网卡设备

    在这里插入图片描述

    4次切换,4次拷贝

    虚拟内存 mmap

    1.虚拟内存空间可以远远大于物理内存空间
    2.多个虚拟内存可以指向同一个物理地址


    正是多个虚拟内存可以指向同一个物理地址,可以把内核空间和用户空间的虚拟地址映射到同一个物理地址,这样的话,就可以减少IO的数据拷贝次数。用户态可以直接访问内核态的数据。

    4次切换,3次拷贝

    sendFile

    sendfile表示在两个文件描述符之间传输数据,它是在操作系统内核中操作的,避免了数据从内核缓冲区和用户缓冲区之间的拷贝操作。

    在这里插入图片描述

    2次切换,3次拷贝

    sendfile + DMA scatter/gather实现的零拷贝

    linux2.4版本后,对sendfile做了优化升级,引入SG-DMA技术,其实就是对DMA拷贝加入了scatter/gather操作,它可以直接从内核空间缓冲区中将数据读取到网卡,这样的话还可以省去CPU拷贝。

    在这里插入图片描述

    2次切换,2次拷贝,CPU全程不参与数据搬运

    直接内存 Direct ByteBuf

    Netty通常使用直接内存(Direct Memory)来提高性能。直接内存是一种特殊的内存分配方式,不同于Java堆内存。

    1. ByteBuf与直接内存:Netty中的ByteBuf是一个用于处理字节数据的缓冲区抽象。ByteBuf可以使用直接内存分配,这称为"Direct ByteBuf"。直接内存分配意味着ByteBuf中的数据存储在堆外内存,而不是在Java堆中。

    2. 减少内存复制:在进行网络数据传输时,数据通常需要从应用程序的缓冲区复制到操作系统内核缓冲区,然后再从内核缓冲区复制到网络适配器。使用直接内存,可以在这些步骤中减少或消除数据复制,提高了性能。

    3. 零拷贝:直接内存可以与零拷贝相结合,使数据可以在应用程序和操作系统之间进行高效的传输。

    4. 缓冲区池:Netty通常使用池化的ByteBuf来管理直接内存的分配和释放。这种方式可以避免频繁地分配和释放直接内存,提高了内存管理的效率。

    5. 内存管理控制:Netty提供了一些工具和机制,帮助开发者有效地管理直接内存,包括手动释放、自动回收等。

    示例代码
    1. import io.netty.bootstrap.ServerBootstrap;
    2. import io.netty.channel.ChannelHandlerContext;
    3. import io.netty.channel.ChannelInboundHandlerAdapter;
    4. import io.netty.channel.ChannelInitializer;
    5. import io.netty.channel.nio.NioEventLoopGroup;
    6. import io.netty.channel.socket.SocketChannel;
    7. import io.netty.channel.socket.nio.NioServerSocketChannel;
    8. import io.netty.buffer.ByteBuf;
    9. import io.netty.buffer.Unpooled;
    10. public class NettyDirectMemoryExample {
    11. public static void main(String[] args) throws InterruptedException {
    12. // 创建两个EventLoopGroup,一个用于接受客户端连接,一个用于处理客户端请求
    13. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    14. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    15. try {
    16. // 创建ServerBootstrap
    17. ServerBootstrap serverBootstrap = new ServerBootstrap();
    18. serverBootstrap.group(bossGroup, workerGroup)
    19. .channel(NioServerSocketChannel.class)
    20. .childHandler(new ChannelInitializer() {
    21. @Override
    22. protected void initChannel(SocketChannel ch) {
    23. ch.pipeline().addLast(new EchoServerHandler());
    24. }
    25. });
    26. // 绑定端口并启动服务器
    27. serverBootstrap.bind(8080).sync().channel().closeFuture().sync();
    28. } finally {
    29. bossGroup.shutdownGracefully();
    30. workerGroup.shutdownGracefully();
    31. }
    32. }
    33. // 自定义ChannelHandler处理客户端消息
    34. static class EchoServerHandler extends ChannelInboundHandlerAdapter {
    35. @Override
    36. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    37. ByteBuf in = (ByteBuf) msg;
    38. ByteBuf out = Unpooled.directBuffer(); // 创建Direct ByteBuf
    39. try {
    40. out.writeBytes(in); // 将接收到的数据写入Direct ByteBuf
    41. ctx.write(out); // 写入回应数据到客户端
    42. ctx.flush();
    43. } finally {
    44. in.release(); // 释放接收缓冲区
    45. out.release(); // 释放Direct ByteBuf
    46. }
    47. }
    48. @Override
    49. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    50. cause.printStackTrace();
    51. ctx.close();
    52. }
    53. }
    54. }

    使用直接内存需要谨慎管理,以避免内存泄漏和其他问题。

  • 相关阅读:
    WorkPlus Meet白板和文档共享功能上线,私有化视频会议全新升级
    2021年12月电子学会图形化二级编程题解析含答案:绘制多边形
    视频集中存储/直播点播平台EasyDSS点播文件分类功能新升级
    Java基于springboot+vue药店实名制买药系统 前后端分离
    如何给在 SAP Business Application Studio 里开发的 OData 服务准备测试数据试读版
    【线性代数】MIT Linear Algebra Lecture 4: Factorization into A = LU
    个人怎么给短视频配音?三个简单的小技巧,配音原来并不难
    【HDU No. 5057】序列操作 Argestes and Sequence
    C# 找出数组中只出现了一次的数字
    No module named ‘win32file‘
  • 原文地址:https://blog.csdn.net/qq_44025894/article/details/133719355