• 单线程、多线程Reactor模型


    前言

    反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

    1. Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
    2. Handlers处理器的职责:非阻塞的执行业务处理逻辑。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等
    1. 单线程反应器

    IO多路复用使用了NIO模型。
    Reactor模式使用了IO多路复用的机制。

    /反应器
    class EchoReactor implements Runnable {
        Selector selector;
        ServerSocketChannel serverSocket;
        EchoReactor() throws IOException {
            //...获取选择器、开启serverSocket服务监听通道
            //Reactor初始化
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress("127.0.0.1",8080));
            //非阻塞
            serverSocket.configureBlocking(false);
    
            //分步处理.第一步,注册serverSocket(监听)的accept事件
            SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    
            //将新连接处理器作为附件,绑定到sk选择键
            // 绑定AcceptorHandler新连接处理器到selectKey;类似对象的set方法
            sk.attach(new AcceptorHandler());
        }
        //轮询和分发事件
        public void run() {
            try {
                while (! Thread.interrupted()) {
                    // 返回注册在selector中等待IO操作(及有事件发生)channel的selectionKey
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        //反应器负责dispatch收到的事件
                        SelectionKey sk = it.next();
                        //分发
                        dispatch(sk);
                    }
                    selected.clear();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            //调用之前attach绑定到选择键的handler处理器对象
            if (handler!= null) {
                handler.run();
            }
        }
    
        // Handler:新连接处理器
        class AcceptorHandler implements Runnable {
            public void run() {
                try {
                    //处理连接
                    SocketChannel channel = serverSocket.accept();
                    if (channel!= null)
                    // 构造方法中绑定。处理真实的逻辑
                    new EchoHandler(selector, channel);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            new Thread(new EchoReactor()).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    class EchoHandler implements Runnable {
        final SocketChannel channel;
        final SelectionKey sk;
        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        static final int RECIEVING = 0, SENDING = 1;
        int state = RECIEVING;
    
        EchoHandler(Selector selector, SocketChannel c) throws IOException {
            channel = c;
            c.configureBlocking(false);
            //取得选择键,再设置感兴趣的IO事件
            //将新的SocketChannel传输通道,注册到了反应器Reactor类的同一个选择器中。
            // 这样保证了Reactor类和Handler类在同一个线程中执行
            sk = channel.register(selector, 0);
            //将Handler自身作为选择键的附件
            //Channel传输通道注册完成后,将EchoHandler自身作为附件,attach到了选择键中。
            // 这样,在Reactor类分发事件(选择键)时,能执行到EchoHandler的run方法
            sk.attach(this);
            //注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        public void run() {
            try {
                if (state == SENDING) {
                    //写入通道
                    channel.write(byteBuffer);
                    //写完后,准备开始从通道读,byteBuffer切换成写入模式
                    byteBuffer.clear();
                    //写完后,注册read就绪事件
                    sk.interestOps(SelectionKey.OP_READ);
                    //写完后,进入接收的状态
                    state = RECIEVING;
                } else if (state == RECIEVING) {
                    //从通道读
                    int length = 0;
                    while ((length = channel.read(byteBuffer)) > 0) {
                        System.out.println(new String(byteBuffer.array(), 0, length));
                    }
                    //读完后,准备开始写入通道,byteBuffer切换成读取模式
                    byteBuffer.flip();
                    //读完后,注册write就绪事件
                    sk.interestOps(SelectionKey.OP_WRITE);
                    //读完后,进入发送的状态
                    state = SENDING;
                }
                //处理结束了,这里不能关闭select key,需要重复使用
                //sk.cancel();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    1.1 单线程反应器问题

    当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,如果被阻塞的Handler不仅仅负责输入和输出处理的业务,还包括负责连接监听的AcceptorHandler处理器。这个是非常严重的问题

    2 多线程反应器
    1. 将负责业务处理的线程单独放入线程池中,即将业务处理的线程和连接监听、IO查询的线程隔离
    2. 如何服务器为多核CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程,即工作线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器(类似多个单线程的多路复用)。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力
    //多线程反应器
    class MultiThreadEchoServerReactor {
        ServerSocketChannel serverSocket;
        AtomicInteger next = new AtomicInteger(0);
    
        Selector bossSelector = null;
        //selectors集合,引入多个selector选择器
        Selector[] workSelectors = new Selector[2];
    
    
        Reactor bossReactor = null;
        //引入多个子反应器
        Reactor[] workReactors = null;
    
    
        MultiThreadEchoServerReactor() throws IOException {
    
            //初始化多个selector选择器
            bossSelector = Selector.open();// 用于监听新连接事件
            workSelectors[0] = Selector.open(); // 用于监听read、write事件
            workSelectors[1] = Selector.open(); // 用于监听read、write事件
    
            serverSocket = ServerSocketChannel.open();
            InetSocketAddress address =
                    new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                            NioDemoConfig.SOCKET_SERVER_PORT);
            serverSocket.socket().bind(address);
            serverSocket.configureBlocking(false);//非阻塞
    
    
            //bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelector
            SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
            //绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
            sk.attach(new AcceptorHandler());
    
    
            //bossReactor反应器,处理新连接的bossSelector
            bossReactor = new Reactor(bossSelector);
            //第一个子反应器,一子反应器负责一个worker选择器
            Reactor workReactor1 = new Reactor(workSelectors[0]);
            //第二个子反应器,一子反应器负责一个worker选择器
            Reactor workReactor2 = new Reactor(workSelectors[1]);
            workReactors = new Reactor[]{workReactor1, workReactor2};
        }
    
        private void startService() {
            // 一子反应器对应一条线程
            new Thread(bossReactor).start(); //连接的事件
            new Thread(workReactors[0]).start();
            new Thread(workReactors[1]).start();
        }
    
        //反应器
        //连接|工作
        class Reactor implements Runnable {
            //每条线程负责一个选择器的查询
            final Selector selector;
    
            public Reactor(Selector selector) {
                this.selector = selector;
            }
    
            public void run() {
                try {
                    while (!Thread.interrupted()) {
                        阻塞1000毫秒,在1000毫秒后返回
                        selector.select(1000);
                        //选择器是构造方法传入的。
                        // 连接和工作有不同的选择器。work  boss 。
                        // 获取所有的有事件的。当是bossReactor的时候,有连接过来,就不为空了
                        Set<SelectionKey> selectedKeys = selector.selectedKeys();
                        if (null == selectedKeys || selectedKeys.size() == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> it = selectedKeys.iterator();
                        while (it.hasNext()) {
                            //Reactor负责dispatch收到的事件
                            SelectionKey sk = it.next();
                            //不同的选择器,执行附件里面不同的逻辑
                            dispatch(sk);
                        }
                        selectedKeys.clear();
                    }
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
    
    
            void dispatch(SelectionKey sk) {
                //取出附件,接下来去执行
                Runnable handler = (Runnable) sk.attachment();
                //调用之前attach绑定到选择键的handler处理器对象
                //执行逻辑
                if (handler != null) {
                    handler.run();
                }
            }
        }
    
    
        // Handler:新连接处理器
        // 将连接通道放到工作选择器中(这里面有实现)
        class AcceptorHandler implements Runnable {
            public void run() {
                try {
                    SocketChannel channel = serverSocket.accept();
                    Logger.info("接收到一个新的连接");
    
                    //连接成功了,
                    //接下来将通道和选择器绑定。
                    // 选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的感兴趣的那些IO事件
                    if (channel != null) {
                        int index = next.get();
                        Logger.info("选择器的编号:" + index);
                        Selector selector = workSelectors[index];
                        //连接的通道 放到选择器中
                        new MultiThreadEchoHandler(selector, channel);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (next.incrementAndGet() == workSelectors.length) {
                    next.set(0);
                }
            }
        }
    
    
        public static void main(String[] args) throws IOException {
            MultiThreadEchoServerReactor server =
                    new MultiThreadEchoServerReactor();
            server.startService();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    class MultiThreadEchoHandler implements Runnable {
        final SocketChannel channel;
        final SelectionKey sk;
        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        static final int RECIEVING = 0, SENDING = 1;
        int state = RECIEVING;
        //引入线程池
        static ExecutorService pool = Executors.newFixedThreadPool(4);
    
        // 选择器,是工作选择器
        //通道是 连接完成的通道
        MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
            channel = c;
            channel.configureBlocking(false);
            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            //仅仅取得选择键,后设置感兴趣的IO事件
            //通道和选择器绑定,之后就能获取感兴趣的事件了
            //通过select方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的感兴趣的那些IO事件
            sk = channel.register(selector, 0);
            // 继续绑定事件,将来执行使用
            //将本Handler作为sk选择键的附件,方便事件dispatch
            sk.attach(this);
            //向sk选择键注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
            //唤醒 查询线程,使得OP_READ生效
            selector.wakeup();
            Logger.info("新的连接 注册完成");
    
        }
    
        public void run() {
            //异步任务,在独立的线程池中执行
            //提交数据传输任务到线程池
            //使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
            //工作线程也是有限的,就几个,是非阻塞的,真正的去完成数据传输这种操作,还是新建线程去完成,
            //不然整个服务不就变成几个工作线程了,工作线程是来指派认为让线程执行的
            pool.execute(new AsyncTask());
        }
    
        //异步任务,不在Reactor线程中执行
        //数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
        public synchronized void asyncRun() {
            try {
                if (state == SENDING) {
                    //写入通道
                    channel.write(byteBuffer);
    
                    //写完后,准备开始从通道读,byteBuffer切换成写模式
                    byteBuffer.clear();
                    //写完后,注册read就绪事件
                    sk.interestOps(SelectionKey.OP_READ);
                    //写完后,进入接收的状态
                    state = RECIEVING;
                } else if (state == RECIEVING) {
                    //从通道读
                    int length = 0;
                    while ((length = channel.read(byteBuffer)) > 0) {
                        Logger.info(new String(byteBuffer.array(), 0, length));
                    }
                    //读完后,准备开始写入通道,byteBuffer切换成读模式
                    byteBuffer.flip();
                    //读完后,注册write就绪事件
                    sk.interestOps(SelectionKey.OP_WRITE);
                    //读完后,进入发送的状态
                    state = SENDING;
                }
                //处理结束了, 这里不能关闭select key,需要重复使用
                //sk.cancel();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
        //异步任务的内部类
        class AsyncTask implements Runnable {
            public void run() {
                MultiThreadEchoHandler.this.asyncRun();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    注意
    使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
    pool.execute(new AsyncTask());。工作线程也是有限的,就几个,是非阻塞的,真正的去完成数据传输这种操作,还是新建线程去完成,不然整个服务不就变成几个工作线程了,工作线程来查询连接是否有数据需要处理,然后用线程池去处理。

    引用:《Java高并发核心编程(卷1)》一书。做了更多的注释

  • 相关阅读:
    Threejs入门教程
    异常处理之EnhancedServiceNotFoundException
    深入解析Redis的LRU与LFU算法实现
    新版本Android Studio logcat日志过滤提示
    Panda3D设置游戏背景颜色和节点颜色
    vscode装的一些好用的插件和设置
    加固三防平板如何提高轨道交通系统的运营效率?
    【网络原理】UDP和TCP协议重点知识汇总
    C++入门学习(4)引用 (讲解拿指针比较)
    记录工作中常用的 JS 数组相关操作
  • 原文地址:https://blog.csdn.net/sbl19940819/article/details/126985799