• Netty的再次认知 | 多图理解Netty的线程模型


    前言:2022年是我工作中最不稳定的一年,今年也在准备教资考试,以至于忽略了很久的技术学习;我司的游戏架构是基于Netty设计的异步网络通信架构,我之前对Netty的认知仅停留在应用层面,借此机会,连载两篇文章来深入学习下Netty4.0的模型;
    第一篇: 本篇主要讲解Netty的线程模型,给一种直观的感受;
    第二篇:源码分析,用流程图讲解Netty组件的工作流程;
    本文基于Netty4,如有错误请指正

    关于Buffer和Channel的注意事项和细节
    1. ByteBuffer支持类型化的put和get操作;若操作类型不一致会出现BufferUnderflowException异常;

    2. 可以将一个普通的Buffer转换成只读的Buffer;

    3. Nio提供了MappedByteBuffer,可以让文件直接在对外内存中进行修改;

      public static void main(String[] args) throws Exception {
          RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
      
          FileChannel channel = randomAccessFile.getChannel();
      
          /*
              参数1:表示channel的操作,只读还是读写
              参数2:操作的起始位置
              参数3:操作的大小,如可操作5个字节
               */
          MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
      
          map.put(0, (byte)'H');
          map.put(3, (byte)'9');
          randomAccessFile.close();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    4. Nio支持通过多个Buffer(即Buffer数组)完成读写操作;即Scattering和Gathering;意思就是定义Buffer数组,现有API支持,直接写入数组中,若超过数组所有Buffer的容量则报错;

    NIO 零拷贝

    零拷贝:无需CPU拷贝;可以提升性能上的优势,可以更少的切换上下文,更少的CPU缓存等等;

    在这里插入图片描述

    传统IO:数据从硬盘上拷贝到内核缓冲区,再通过CPU拷贝到用户态内存空间,由用户态再拷贝到SocketBuffer中,最后写入网卡中;【4拷贝3切换】

    mmap:通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以攻下昂内核空间的数据;这样在进行文件网络传输时,就可以减少内核空间到用户空间的内存拷贝次数;但仍无法避免CPU拷贝;【3拷贝3切换】

    sendFile(2.1):Linux2.1版本提供了系统调用函数,数据不经过用户态,直接从内核缓存区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换;【3拷贝2切换】

    sendFile(2.4):Linux2.4版本做了修改,避免从内核缓冲区拷贝到SocketBuffer的操作,直到拷贝到网卡中,从而再减少一次数据拷贝;【3拷贝2切换】

    其中mmap适合小数据量的读写,sendFile适合大文件传输;

    相关API
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("baidu.com", 9001));
        String fileName = "1.txt";
    
        FileChannel channel = new FileInputStream(fileName).getChannel();
    
        // linux下,调用一次transferTo可以完成传输
        // windows下,调用一次transferTo只能发送8mb数据,需要分段传输
        channel.transferTo(0, channel.size(), socketChannel);
        channel.close();
        socketChannel.close();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    原生NIO存在的问题
    1. NIO的类库和API繁杂,使用麻烦;需要熟练掌握Selector,ServerSocketChannel,SocketChannel,ByteBuffer等;
    2. 需要具备其他的额外技能;要熟悉Java多线程编程,因为BIO编程涉及到Reactor模式,需要对网络编程非常熟练;
    3. 开发工作量以及难度非常大,例如断线重连,网络闪断,半包读写,失败缓存,网络拥塞和异常流的处理等;
    4. JDK NIO的Bug,Epoll Bug,它会导致Selector空轮询,最终导致CPU 100%;
    Netty介绍
    1. Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的,基于事件驱动的网络应用程序框架,用以快速开发高性能,高可靠性的网络IO程序;
    2. Netty可以帮助你快速,简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程;
    3. Netty是目前最流行的NIO框架,Netty在互联网领域,大数据分布式计算领域,游戏或行业等;Elasticsearch,Dubbo框架内部都采用了Netty;
    Netty的优点

    Netty对JDK自带的NIO的API进行了封装,解决了上述问题;

    1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池;
    2. 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项;
    3. 高性能,吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制;
    4. 安全:完整的SSL/TLS和StartTLS支持;
    5. 社区活跃,不断更新;
    传统IO服务模型

    在这里插入图片描述

    模型特点
    1. 采用阻塞IO模型获取输入的数据;
    2. 每个连接都需要独立的线程完成数据的输入,业务处理以及数据返回;
    问题分析
    1. 当并发数量大时,会创建大量的线程,占用大量的系统资源;
    2. 连接创建后,若当前线程没有可读数据,该线程会阻塞在read操作上,造成线程资源的浪费;
    Reactor模式
    解决传统模型的痛点
    1. 基于IO复用模型,多个连接共享一个阻塞对象,程序只需要在一个阻塞对象等待,无需阻塞等待所有的连接。当某个连接有数据处理时,操作系统通知应用程序线程从阻塞状态返回,开始进行业务处理;
    2. 基于线程池复用线程资源,不必再为每个连接创建一个线程;当一个连接处理完毕后,可以将该线程给下一个连接使用;
    Reactor模式设计

    在这里插入图片描述

    通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动);

    服务器端程序处理传入的多个请求并将他们同步分派到相应处理线程;

    使用IO多路复用监听事件,当收到事件后就分发给某个线程;

    单Reactor单线程

    在这里插入图片描述

    采用select处理客户端的请求事件,然后通过Dispatch分发请求进行处理;如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象,由该handler对象进行后续业务处理;

    如果不是建立连接的事件,则由Reactor将事件分发到连接对应的Handler来处理整个业务;

    优点:

    1. 模型简单,没有多线程,进程通信,竞争的问题,全部都在一个线程中完成;

    缺点:

    1. 性能问题,只有一个线程,无法发挥多核CPU的性能;Handler在处理某个连接的事件时可能阻塞其他连接的响应;
    2. 可靠性问题,线程意外终止或者进入死循环将会导致整个通信模块不可用,造成节点故障;

    使用场景:

    客户端数量有限,业务处理非常快,例如Redis;

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        // 不能将同一个channel注册两次不同类型
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannel.bind(new InetSocketAddress(80));
        System.out.println("绑定成功");
        Handler handler = new Handler();
        // 循环处理selector
        while (true) {
            // 等待select
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            // 分派 dispatch
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    handler.doHandle(channel);
                } else {
                    System.out.println("无法处理 channel op"+selectionKey.interestOps());
                }
                iterator.remove();
            }
        }
    }
    
    // 定义处理Handler
    static class Handler {
    
        public void doHandle(SocketChannel channel) throws Exception {
            // 分配空间
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            // 读取数据
            channel.read(allocate);
            // 业务处理-打印
            System.out.println(new String(allocate.array()));
            allocate.clear();
            allocate.put("Hello".getBytes());
            allocate.flip();
            // 响应数据
            channel.write(allocate);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    单Reactor多线程

    在这里插入图片描述

    采用select处理客户端的请求事件,然后通过Dispatch分发请求进行处理;如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象,由该handler对象进行后续业务处理;

    如果不是建立连接的事件,则由Reactor将事件分发到连接对应的Handler来处理,Handler只负责响应事件,具体的业务处理在read读取数据后交给后面的线程池去处理;

    优点:可以充分利用CPU多核的处理能力;

    缺点:多线程数据共享和访问比较复杂,reactor所有的事件监听与响应仍在单线程中运行,高并发下容易出现瓶颈;

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        // 不能将同一个channel注册两次不同类型
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannel.bind(new InetSocketAddress(80));
        System.out.println("绑定成功");
        // SOCKET与HANDLE映射关系
        Map<Integer, Handler> map = new HashMap<>();
        ExecutorService workGroup = Executors.newFixedThreadPool(2);
        // 循环处理selector
        while (true) {
            // 等待select
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            // 分派 dispatch
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    Handler handler = map.get(channel.hashCode());
                    if (handler == null) {
                        handler = new Handler(channel, workGroup);
                    }
                    handler.doHandle();
                    map.put(channel.hashCode(), handler);
                } else {
                    System.out.println("无法处理 channel op"+selectionKey.interestOps());
                }
                iterator.remove();
            }
        }
    }
    
    // 定义处理Handler
    static class Handler {
        SocketChannel socketChannel;
        ExecutorService workGroup;
    
        public Handler(SocketChannel socketChannel, ExecutorService workGroup) {
            this.socketChannel = socketChannel;
            this.workGroup = workGroup;
            System.out.println("创建了socket"+socketChannel);
        }
    
        public void doHandle() throws Exception {
            // 分配空间
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            // 读取数据
            socketChannel.read(allocate);
            // 业务处理-打印
            workGroup.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " === " +new String(allocate.array()));
                allocate.clear();
                allocate.put("Hello".getBytes());
                allocate.flip();
                // 响应数据
                try {
                    socketChannel.write(allocate);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    主从Reactor多线程

    在这里插入图片描述

    Reactor主线程负责建立连接,当连接建立后,将其交给Reactor子线程去处理;其可以对应多个Reactor子线程;

    Reactor子线程将连接加入队列中进行监听,并创建handler进行各种事件的处理;handler通过读取数据,将业务逻辑交给worker线程池处理,最终由handler去响应结果;

    worker线程池分配独立的worker线程去处理业务,并返回结果;

    优点:

    1. 父线程与子线程的数据交互简单职责明确,父线程只需要接收新的连接,子线程完成后续业务处理;
    2. 父线程与子线程数据交互舰导弹,Reactor主线程只需要把新的连接交给子线程,子线程 无需返回数据;

    缺点:编程复杂度高

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        // 不能将同一个channel注册两次不同类型
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannel.bind(new InetSocketAddress(80));
        System.out.println("绑定成功");
        SubReactorFactory subReactorFactory = new SubReactorFactory(2, 8);
        // 循环处理selector
        while (true) {
            // 等待select
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            // 分派 dispatch
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    subReactorFactory.dispatch(accept);
                } else {
                    System.out.println("无法处理 channel op" + selectionKey.interestOps());
                }
                iterator.remove();
            }
        }
    }
    
    // 工厂类,用于分配boss线程和SubReactor
    static class SubReactorFactory {
        ExecutorService bossGroup;
        ExecutorService workGroup;
        int thread;
        SubReactor[] subReactors;
    
        public SubReactorFactory(int thread, int workGroupNum) {
            this.bossGroup = Executors.newFixedThreadPool(thread);
            this.workGroup = Executors.newFixedThreadPool(workGroupNum);
            this.thread = thread;
            subReactors = new SubReactor[thread];
            for (int i = 0; i < thread; i++) {
                subReactors[i] = new SubReactor(workGroup);
            }
        }
    
        public void dispatch(SocketChannel channel) {
            SubReactor subReactor = subReactors[channel.hashCode() % thread];
            bossGroup.submit(() -> {
                try {
                    subReactor.register(channel);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    
    }
    
    // 拥有独立的Selector
    static class SubReactor {
        Selector selector;
        Map<Integer, Handler> map = new HashMap<>();
        ExecutorService workGroup;
    
    
        public SubReactor(ExecutorService workGroup) {
            try {
                selector = Selector.open();
                this.workGroup = workGroup;
            } catch (Exception e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                try {
                    this.handler();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
        public void register(SocketChannel channel) throws IOException {
            channel.register(selector, SelectionKey.OP_READ);
            selector.wakeup();
        }
    	// 这里之所以用select(long timeout)是因为有消息过来并未出发select返回;
        public void handler() throws Exception {
            while (true) {
                // 这里可能会出现性能瓶颈
                if (selector.select(5000) == 0) {
                    System.out.println("selector无事件 "+ this);
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                // 分派 dispatch
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        // 不卡主线程,提高并发
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        Handler handler = map.get(channel.hashCode());
                        if (handler == null) {
                            handler = new Handler(channel, workGroup);
                        }
                        Handler finalHandler = handler;
                        map.put(channel.hashCode(), finalHandler);
                        finalHandler.doHandle();
                    } else {
                        System.out.println("无法处理 channel op" + selectionKey.interestOps());
                    }
                    iterator.remove();
                }
            }
        }
    }
    
    // 定义处理Handler
    static class Handler {
        SocketChannel socketChannel;
        ExecutorService workGroup;
    
        public Handler(SocketChannel socketChannel, ExecutorService workGroup) {
            this.socketChannel = socketChannel;
            this.workGroup = workGroup;
            System.out.println("创建了socket" + socketChannel);
        }
    
        public void doHandle() throws Exception {
            // 分配空间
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            // 读取数据
            socketChannel.read(allocate);
            // 业务处理-打印
            workGroup.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " === " + new String(allocate.array()));
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                allocate.clear();
                allocate.put("Hello".getBytes());
                allocate.flip();
                // 响应数据
                try {
                    socketChannel.write(allocate);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    Reactor模式的优点
    1. 响应快,不必为单个同步连接所阻塞;
    2. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的开销;
    3. 扩展性好,可以方便的通过增加Reactor线程数来充分利用CPU资源;
    4. 复用性好,Reactor本身与具体的业务逻辑无关;
    Netty模型
    • Netty专门有两个线程池去处理连接和读写;BossGroup处理客户端连接,WorkGroup处理channel的网络读写;
    • BossGroup和WorkGroup都是NioEventLoopGroup;
    • NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯;
    • NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop;
    • 每个Boss NioEventLoop循环的步骤:
      • 轮询Accept事件;
      • 处理accept事件,与client建立连接,生成NioScoketChannel,并将其注册到某个Worker NioEventLoop上的selector;
      • 处理任务队列的任务,即runAllTasks;
    • 每个Worker NioEventLoop循环的步骤:
      • 轮询read,write事件
      • 处理I/O事件,即read,write事件,在对应的NioSocketChannel上处理;
      • 处理任务队列的任务,即runAllTasks;
    • 每个Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含很多channel,即通过pipeline可以获取到对应通道,管道中维护了许多处理器;

    在这里插入图片描述

    Netty线程模型

    当不指定NioEventLoopGroup中的线程数时,则取java启动参数io.netty.eventLoopThreads,若没有配置则是cpu数*2;

    由于EventLoop是一个单线程线程池,所以它可以执行定时任务等其他线程任务;

    NioEventLoop内部采用串行化设计,从消息的读取->解码->编码->发送,始终由IO线程NioEventLoop负责;

    每个NioEventLoop的selector上可以注册监听多个NioChannel;

    每个NioChannel只会绑定在唯一的NioEventLoop上;

    每个NioChannel都绑定有一个自己的ChannelPipeline上;

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    自定义类型:结构体,声明,变量初始化,结构体内存对齐。
    Self-adaptive Differential Evolution Algorithm for Numerical Optimization
    可扩展标记语言——XML
    GreenPlum的学习心得和知识总结(三)|Greenplum数据库快速入门
    一文理解虚拟机栈
    PhpStorm环境配置与应用
    图论_2。
    netty群聊客户端服务器及心跳检测
    哪个券商支持网格交易功能,条件单功能强大,交易手续费低
    在线电子书阅读小程序,微信小程序电子书阅读,微信小程序小说阅读器毕业设计作品
  • 原文地址:https://blog.csdn.net/qq_37419449/article/details/127415384