• 基于Java NIO 写的一个简单版 Netty 服务端


    A Simple Netty Based On JAVA NIO

    基于Java NIO 写的一个简单版 Netty 服务端

    前置知识

    NIO

    • NIO 一般指 同步非阻塞 IO,同样用于**描述程序访问数据方式 **的还有BIO(同步阻塞)、AIO(异步非阻塞)
    • 同步异步指获取结果的方式,同步为主动去获取结果,不管结果是否准备好,异步为等待结果准备好的通知
    • 阻塞非阻塞是线程在结果没有到来之前,是否进行等待,阻塞为进行等待,非阻塞则不进行等待
    • NIO 主动地去获取结果,但是在结果没有准备好之前,不会进行等待。而是通过一个 多路复用器 管理多个通道,由一个线程轮训地去检查是否准备好即可。在网络编程中,多路复用器通常由操作系统提供,Linux中主要有 select、poll、epoll。同步非阻塞指线程不等待数据的传输,而是完成后由多路复用器通知,线程再将数据从内核缓冲区拷贝到用户空间内存进行处理。

    Java NIO

    • 基于 NIO 实现的网络框架,可以用少量的线程,处理大量的连接,更适用于高并发场景。于是,Java提供了NIO包提供相关组件,用于实现同步非阻塞IO
      • 核心三个类Channel、Buffer、Selector。Channel代表一个数据传输通道,但不进行数据存取,有Buffer类进行数据管理,Selector为一个复用器,管理多个通道

    Bytebuffer

    • 该类为NIO 包中用于操作内存的抽象类,具体实现由HeapByteBuffer、DirectByteBuffer两种
    • HeapByteBuffer为堆内内存,底层通过 byte[ ] 存取数据
    • DirectByteBuffer 为堆外内存,通过JDK提供的 Unsafe类去存取;同时创建对象会关联的一个Cleaner对象,当对象被GC时,通过cleaner对象去释放堆外内存

    各核心组件介绍

    NioServer

    为启动程序类,监听端口,初始化Channel

    • 下面为NIO模式下简单服务端处理代码
    // 1、创建服务端Channel,绑定端口并配置非阻塞
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(6666));
    serverSocketChannel.configureBlocking(false);
    
    // 2、创建多路复用器selector,并将channel注册到多路复用器上
    // 不能直接调用channel的accept方法,因为属于非阻塞,直接调用没有新连接会直接返回
    Selector selector = Selector.open();
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
    // 3、循环处理多路复用器的IO事件
    while(true){
    
        // 3.1、select属于阻塞的方法,这里阻塞等待1秒
        // 如果返回0,说明没有事件处理
        if (selector.select(1000) == 0){
            System.out.println("服务器等待了1秒,无IO事件");
            continue;
        }
        // 3.2、遍历事件进行处理
        Set selectionKeys = selector.selectedKeys();
        Iterator iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            // accept事件,说明有新的客户端连接
            if (key.isAcceptable()){
                // 新建一个socketChannel,注册到selector,并关联buffer
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                System.out.println("客户端连接:"+socketChannel.getRemoteAddress());
            }
            // read事件 (内核缓冲区的数据准备好了)
            if(key.isReadable()){
                SocketChannel channel = (SocketChannel)key.channel();
                ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
                try {
                  // 将数据写进buffer
                    int readNum = channel.read(byteBuffer);
                    if (readNum == -1){
                        System.out.println("读取-1时,表示IO流已结束");
                        channel.close();
                        break;
                    }
                    // 打印buffer
                    byteBuffer.flip();
                    byte[] bytes = new byte[readNum];
                    byteBuffer.get(bytes, 0, readNum);
                    System.out.println("读取到数据:" + new String(bytes));
                } catch (IOException e) {
                    System.out.println("读取发生异常,广播socket");
                    channel.close();
                }
    
            }
            // write事件 (操作系统有内存写出了)
            if (key.isWritable()){
                SocketChannel channel = (SocketChannel)key.channel();
                // 读取read时暂存数据
                byte[] bytes = (byte[])key.attachment();
                if (bytes != null){
                    System.out.println("可写事件发生,写入数据: " + new String(bytes));
                    channel.write(ByteBuffer.wrap(bytes));
                }
                // 清空暂存数据,并切换成关注读事件
                key.attach(null);
                key.interestOps(SelectionKey.OP_READ);
            }
            iterator.remove();
        }
    }
    

    EventLoop

    处理 Channel 中数据的读写

    • 在上面的Server中,大量并发时单线程地处理读写事件会导致延迟,因此将读写处理抽取出来,可利用多线程实现高并发
    • 一个EventLoop会关联一个selector,只会处理这个selector上的Channel
    public class EventLoop2 implements Runnable{
    
    
        private final Thread thread;
        /**
         * 复用器,当前线程只处理这个复用器上的channel
         */
        public Selector selector;
        /**
         * 待处理的注册任务
         */
        private final Queue queue = new LinkedBlockingQueue<>();
    
        /**
         * 初始化复用器,线程启动
         * @throws IOException
         */
        public EventLoop2() throws IOException {
            this.selector = SelectorProvider.provider().openSelector();
            this.thread = new Thread(this);
            thread.start();
        }
    
        /**
         * 将通道注册给当前的线程处理
         * @param socketChannel
         * @param keyOps
         */
        public void register(SocketChannel socketChannel,int keyOps){
            // 将注册新的socketChannel到当前selector封装成一个任务
            queue.add(()->{
                try {
                    MyChannel myChannel = new MyChannel(socketChannel, this);
                    SelectionKey key = socketChannel.register(selector, keyOps);
                    key.attach(myChannel);
                } catch (Exception e){
                    e.printStackTrace();
                }
            });
            // 唤醒阻塞等待的selector线程
            selector.wakeup();
        }
    
        /**
         * 循环地处理 注册事件、读写事件
         */
        @Override
        public void run() {
            while (!thread.isInterrupted()){
                try {
                    int select = selector.select(1000);
                    // 处理注册到当前selector的事件
                    if (select == 0){
                        Runnable task;
                        while ((task = queue.poll()) != null){
                            task.run();
                        }
                        continue;
                    }
                    // 处理读写事件
                    System.out.println("服务器收到读写事件,select:" + select);
                    processReadWrite();
    
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 处理读写事件
         * @throws Exception
         */
        private void processReadWrite() throws Exception{
            System.out.println(Thread.currentThread() + "开始监听读写事件");
            // 3.2、遍历事件进行处理
            Set selectionKeys = selector.selectedKeys();
            Iterator iterator = selectionKeys.iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                MyChannel myChannel = (MyChannel)key.attachment();
                if(key.isReadable()){
                    // 将数据读进buffer
                    myChannel.doRead(key);
                }
                if (key.isWritable()){
                    myChannel.doWrite(key);
                }
                iterator.remove();
            }
        }
    }
    
    

    EventloopGroup

    一组EventLoop,轮训地为eventLoop分配Channel

    public class EventLoopGroup {
        private EventLoop2[] children = new EventLoop2[1];
    
        private AtomicInteger idx = new AtomicInteger(0);
    
        public EventLoopGroup() throws IOException {
            for (int i = 0; i < children.length; i++){
                children[i] = new EventLoop2();
            }
        }
    
        public EventLoop2 next(){
            // 轮训每一个children
            return children[idx.getAndIncrement() & (children.length - 1)];
        }
    
        public void register(SocketChannel channel,int ops){
            next().register(channel,ops);
        }
    }
    

    Channel

    封装了SocketChannel 和 Pipline,将从Channel读写的消息,沿着Pipline上的节点进行处理

    • 在上面EventLoop中,注册Channel到对应的Selector前,会进行封装,将自定义的Channel放在读写事件触发时会返回的SelectionKey里面
    • 同时提供了数据读写处理方法,读写事件触发时调用该方法,数据会沿着pipline上去处理
    public class MyChannel {
    
        private SocketChannel channel;
    
        private EventLoop2 eventLoop;
    
        private Queue writeQueue;
    
        private PipLine pipLine;
    
        /**
         * 一个channel关联一个eventLoop、一个pipLine、一个socketChannel、一个writeQueue
         * @param channel
         * @param eventLoop
         */
        public MyChannel(SocketChannel channel, EventLoop2 eventLoop) {
            this.channel = channel;
            this.eventLoop = eventLoop;
            this.writeQueue = new ArrayDeque<>();
            this.pipLine = new PipLine(this,eventLoop);
            this.pipLine.addLast(new MyHandler1());
            this.pipLine.addLast(new MyHandler2());
        }
    
        /**
         * 读事件处理
         * @param key
         * @throws Exception
         */
        public void doRead(SelectionKey key) throws Exception{
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int readNum = channel.read(buffer);
                if (readNum == -1){
                    System.out.println("读取-1时,表示IO流已结束");
                    channel.close();
                    return;
                }
                // 转成可读状态
                buffer.flip();
                // 消息放入pipLine,交给头节点, 头节点开始传递
                pipLine.headContext.fireChannelRead(buffer);
    
            } catch (IOException e) {
                System.out.println("读取发生异常,广播socket");
                channel.close();
            }
        }
    
        /**
         * 真正地写出数据,关注写事件后,会触发
         * @param key
         * @throws IOException
         */
        public void doWrite(SelectionKey key) throws IOException{
            ByteBuffer buffer;
            while ((buffer =writeQueue.poll()) != null){
                channel.write(buffer);
            }
            // 回复读取状态
            key.interestOps(SelectionKey.OP_READ);
    
        }
    
        /**
         * 写出到队列
         * @param msg
         */
        public void doWriteQueue(ByteBuffer msg){
            writeQueue.add(msg);
        }
    
        /**
         * 从最后一个节点进行写出,写出到头节点是调用doWriteQueue
         * @param msg
         */
        public void write(Object msg){
            this.pipLine.tailContext.write(msg);
        }
    
        /**
         * 从最后一个节点进行flush,写出到头节点时调用doFlush
         */
        public void flush(){
            this.pipLine.tailContext.flush();
        }
    
        /**
         * 关注写事件,才能进行真正地写出
         */
        public void doFlush(){
            this.channel.keyFor(eventLoop.selector).interestOps(SelectionKey.OP_WRITE);
        }
    
    }
    
    

    Handler 和 HandlerContext

    handler 接口定义了可以扩展处理的消息,由开发人员实现具体的处理

    handlerContext 类封装了handler的实现类,将handler的上一个节点和下一个节点,让消息可以延者链表传递

    public interface Handler {
    
        /**
         * 读取数据处理
         * @param ctx
         * @param msg
         */
        void channelRead(HandlerContext ctx,Object msg);
    
        /**
         * 写出数据
         * @param ctx
         * @param msg
         */
        void write(HandlerContext ctx,Object msg);
    
        /**
         * 刷下数据
         * @param ctx
         */
        void flush(HandlerContext ctx);
    }
    
    public class HandlerContext {
    
        private Handler handler;
    
        MyChannel channel;
    
        HandlerContext prev;
    
        HandlerContext next;
    
        public HandlerContext(Handler handler, MyChannel channel) {
            this.handler = handler;
            this.channel = channel;
        }
    
        /**
         * 读消息的传递,从头节点开始往后传
         * @param msg
         */
        public void fireChannelRead(Object msg){
            HandlerContext next = this.next;
            if (next != null){
                next.handler.channelRead(next,msg);
            }
        }
    
        /**
         * 从尾节点开始往前传
         * @param msg
         */
        public void write(Object msg){
            HandlerContext prev = this.prev;
            if (prev != null){
                prev.handler.write(prev,msg);
            }
        }
    
        /**
         * 从尾节点开始往前传
         */
        public void flush(){
            HandlerContext prev = this.prev;
            if (prev != null){
                prev.handler.flush(prev);
            }
        }
    }
    
    

    Pipline

    本质是链表,包含了头尾节点的HandlerContext,提供方法给开发人员加节点

    public class PipLine {
    
        private MyChannel channel;
    
        private EventLoop2 eventLoop;
    
        public HandlerContext headContext;
    
        public HandlerContext tailContext;
    
        public PipLine(MyChannel channel, EventLoop2 eventLoop) {
            this.channel = channel;
            this.eventLoop = eventLoop;
            PipHandler headHandler = new PipHandler();
            this.headContext = new HandlerContext(headHandler,channel);
            PipHandler tailHandler = new PipHandler();
            this.tailContext = new HandlerContext(tailHandler,channel);
            // 构建链表
            this.headContext.next = this.tailContext;
            this.tailContext.prev = this.headContext;
        }
    
        public void addLast(Handler handler){
            HandlerContext curr = new HandlerContext(handler, channel);
    
            // 连接在倒数第二个后面
            HandlerContext lastButOne = this.tailContext.prev;
            lastButOne.next = curr;
            curr.prev = lastButOne;
    
            // 连接在最后一个前面
            curr.next = tailContext;
            tailContext.prev = curr;
    
        }
    
        public static class PipHandler implements Handler{
    
            @Override
            public void channelRead(HandlerContext ctx, Object msg) {
                System.out.println("接收"+(String) msg +"进行资源释放");
            }
    
            @Override
            public void write(HandlerContext ctx, Object msg) {
                System.out.println("写出"+msg.toString());
            }
    
            @Override
            public void flush(HandlerContext ctx) {
                System.out.println("flush");
            }
        }
    }
    
  • 相关阅读:
    华为OD 打印任务排序(100分)【java】A卷+B卷
    【每周研报复现】基于阻力支撑相对强度(RSRS)的市场择时
    React重新渲染指南
    文心一言与GPT-4全面对比——人工智能语言模型的新纪元
    Cholesterol-PEG-Acid,胆固醇-聚乙二醇-羧基保持在干燥、低温环境下
    [附源码]计算机毕业设计springboot惠农微信小程序论文
    Muduo网络库之Channel、EPollPoller与EventLoop类【深度解析】
    taro3.*中使用 dva 入门级别的哦
    MIT6.S081的gdb调试方法
    Python的Numpy库的ndarray对象常用构造方法及初始化方法
  • 原文地址:https://www.cnblogs.com/gg12138/p/18111254