• Netty入门指南之Reactor模型


    作者简介:☕️大家好,我是Aomsir,一个爱折腾的开发者!
    个人主页Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客
    当前专栏Netty应用专栏_Aomsir的博客-CSDN博客

    参考文献

    前言

    在我们之前的文章中,我们详细地探讨了Java NIO和Selector的相关内容,这为我们进一步的学习打下了坚实的基础。从本篇文章开始,我们将深入学习并理解Reactor模型

    单线程Reactor模型

    在前两篇文章,我们使用Selector去监控Channel的ACCEPT事件、WRITE事件、READ事件等等,监听到以后就在当前线程进行处理,这已经是一个单线程的Reactor的模型,Selector来进行分发,起到一个多路复用器的作用,但是这个还远远不够,怎么能只让一个线程来同时处理ACCEPT、WRITE和READ,所以就有了我们后面的主从式
    在这里插入图片描述

    主从式Reactor模型

    谈起主从架构,也就是master-slave,它是主节点做一部分内容,从节点做另外一部分的内容,主从都干活,但是干的活内容不一样,比如我们常见的MySQLRedis,它们的读写分离就是主从式架构。
    还有一种架构是主备架构(Master-Backup),这种架构就是主挂了以后,从起作用,干的活是一样的,比如Redis中的哨兵机制

    结合如下图例更为详细的理解主从式Reactor模型,我们的Boss和Worker都是不同的线程,甚至在实战过程中会是不同的服务器。Boss线程主要用于接收Accept请求,去与客户端建立SocketChannel连接,Worker线程主要去处理实际的读写操作。我们需要把单线程Reactor模型中的sc.register(selector, SelectionKey.OP_READ)转移到Worker线程中去。
    在这里插入图片描述

    多线程知识扫盲

    在接下来的学习中,我们将使用NIO和Selector来实现一个主从Reactor模型。这需要我们具备一定的多线程知识,因此这里我会为你简单介绍一下Java中的多线程。

    在Java中,我们通常通过Thread类来创建和管理新的线程。在实际开发中,我们可以创建一个新的类,让它继承Thread类,并重写其run方法。在这个run方法中,我们可以编写自己的多线程任务逻辑。但是,Java的类只能单继承,这在某些情况下可能会对我们的系统设计造成限制。

    因此,Java还为我们提供了另一种创建线程的方式,即通过实现Runnable接口。我们可以自定义一个类,让它实现Runnable接口,并重写其run方法,在这个方法中编写我们的多线程任务逻辑。然后,我们可以将这个Runnable实现类的对象传递给Thread类的构造方法,从而创建Thread类的对象。这种方式的优点是,我们不再需要直接继承Thread类来实现多线程任务,而是可以将任务逻辑封装在实现了Runnable接口的类中。这样,我们的类就可以在保持多线程功能的同时,也能继承其他类,从而提供更大的设计灵活性。

    由于Java的Thread类实现了Runnable接口,我们可以在设计系统时,采用以下策略:在Runnable的实现类中,添加一个Thread类型的属性,并提供一个register方法。在这个register方法中,我们可以初始化Thread属性,直接将当前类对象(Runnable实现类)传入Thread构造方法进行初始化,然后启动线程。这样我们就可以直接在Runnable内部直接进行线程任务逻辑等,而外部只需要提供一个Runnable接口实现类,线程的创建和启动等都在Runnable接口内部进行操作,封装度更高也更灵活

    ⚠️注意

    • 启动多线程任务是通过Thread#start()方法,而不是通过Thread#run()方法,调用start方法以后,CPU的时间片也不会立马分配给这个线程
    • 除了Thread#run()和Runnable#run()方法内的代码是属于多线程的,其余的都是main线程,包括Runnable实现类中的自定义方法
    • CPU时间片不一定会等待主线程某个方法完全执行才切换给别的线程,但它一定会等一个代码块执行完,比如if
    public class MyThread extends Thread{
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("线程任务逻辑" + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    public class MyRunnable implements Runnable{
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("线程任务逻辑" + " " + i);
            }
        }
    }
    
    public class RunnableTest {
        public static void main(String[] args) {
            // 创建任务对象
            MyRunnable myRunnable = new MyRunnable();
    
            // 创建线程对象,并将任务传递进去
            Thread t1 = new Thread(myRunnable);
    
            // 启动线程
            t1.start();
    
            for (int i = 0; i < 100; i++) {
                System.out.println("main线程" + " " + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Worker线程

    这是我们的Worker线程,用于处理客户端与服务端的读写,在我们这个案例中,所有的读写都交给这些worker线程,主线程不管具体的写。如果是单核CPU,那么时间片会不停的在这些时间片时间轮转,而如果是多核CPU,那直接主线程用于处理连接,多个worker线程用于处理具体的读写

    在下面这个Worker模型中,我们Worker是一个Runnable实现类,其中包含一个Thread类型的属性,在register方法中对它进行初始化,将实现类本类对象传入,代表后面这个thread对象调用的任务是实现类中重写的run方法的逻辑。

    ⚠️注意

    • 主线程和Worker线程维护不同的selector,以免出现污染,主线程的selector监控ServerSocketChannel的ACCEPT事件,Worker线程的selector监控注册在对应线程上的SocketChannel的READ/WRITE事件
    • register方法属于主线程,如果等初始化完还没将SocketChannel注册到这个线程的Selector上,就去执行Worker线程的run方法,那selector就会成为阻塞状态,当CPU时间片切换回主线程,就会注册不上,成为一个死锁状态。
    • 为了解决上面这个问题,我们需要将注册这部分的代码放在任务队列里进行传递,但是阻塞问题还是存在,所以我们将selector唤醒,不让其阻塞。当时间片切换到Worker线程,select方法就不会阻塞,运行循环下面的代码,将注册的代码取出来运行,然后处理读写

    ☹️难点

    • 时间片在sc未注册到selector时就切换给worker线程导致selector阻塞,然后导致阻塞
    • 使用任务队列传递代码同时需要唤醒selector
    • 思路的转变
    public class Worker implements Runnable {
        private static final Logger log = LoggerFactory.getLogger(Worker.class);
    
        // 一个线程对应一个selector,以免污染
        private Selector selector;
    
        // 线程Thread对象
        private Thread thread;
    
        // 线程名
        private String name;
    
        // 通过volatile进行线程同步
        private volatile boolean isCreated;
    
        // 任务队列
        private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue();
    
        // 构造器
        public Worker(String name) {
            this.name = name;
        }
    
        // 线程任务(此段还属于主线程!!!)
        public void register(SocketChannel sc) throws IOException,InterruptedException {
            log.debug("worker register invoke...");
    
            // 对于一个Runnable对象,被调用register后
            // isCreated标志位就会被置为true
            // 注意:CPU等这个if代码块执行结束才有可能被调度到worker线程
            if (!isCreated) {
                thread = new Thread(this, name);
    
                // 调了start,不会立马分配资源(除非抢夺)
                thread.start();
                selector = Selector.open();
                isCreated = true;
            }
    
            // 模拟此处时间片分给worker线程
            // worker线程进入run方法,开始阻塞监听
            // selector就会一直阻塞在select方法上,时间片切换回主线程也无法注册
            // Thread.sleep(1000);
    
            // 任务队列:将main线程中注册的代码传递给worker线程
            runnables.add(() -> {
                try {
                    sc.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    throw new RuntimeException(e);
                }
            });
    
            // 唤醒阻塞在select方法上的worker线程
            // 这样时间片切换到worker线程就直接跳过select方法
            selector.wakeup();
        }
    
        /**
         * 线程任务:实际处理读写操作
         */
        @Override
        public void run() {
            while (true) {
                log.debug("worker run method invoke...");
                try {
                    // 阻塞监听SocketChannel的OP_READ
                    selector.select();
    
                    // 从任务队列中取出任务执行
                    Runnable poll = runnables.poll();
                    if (poll != null) {
                        // 执行注册的步骤
                        poll.run();
                    }
    
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey scKey = iterator.next();
                        iterator.remove();
    
                        if (scKey.isReadable()) {
                            SocketChannel sc = (SocketChannel) scKey.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(30);
    
                            int read = sc.read(buffer);
                            if (read == -1) {
                                scKey.cancel();
                                break;
                            }
    
                            buffer.flip();
                            String result = Charset.defaultCharset().decode(buffer).toString();
                            System.out.println("result = " + result);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    Boss线程

    Boss线程用来监听ServerSocketChannel的ACCEPT事件,监听到了以后将其传递给Worker线程去注册和监听处理,注意线程池只有两个Worker线程,为了保证每一个新进来的SocketChannel都被注册到与前一个线程不同的线程上,这里使用AtomicInteger原子操作类来处理

    public class ReactorBossServer {
    
        private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);
    
        public static void main(String[] args) throws Exception{
            log.debug("boss thread start...");
    
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(8000));
    
            Selector selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);
    
            // 模拟任务池,将任务对象进行创建
            Worker[] workers = new Worker[2];
            for (int i = 0; i < workers.length; i++) {
                // Worker worker = new Worker("worker1");
                workers[i] = new Worker("worker - " + i);
            }
    
            // 原子操作类
            AtomicInteger index = new AtomicInteger();
    
            while (true) {
                // 阻塞等待Channel的事件的触发
                selector.select();
    
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sscSelectionKey = iterator.next();
                    iterator.remove();
    
                    // 如果是ACCEPT请求则进行处理,交给worker线程处理
                    if (sscSelectionKey.isAcceptable()) {
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
    
                        log.debug("boss invoke worker register...");
    
                        // hash取模  x%2 = 0|1
                        // 通过原子类确保sc每次进来注册给不同的worker
                        workers[index.getAndIncrement() % workers.length].register(sc);
    
                        log.debug("boss invoke worker register...");
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    客户端

    public class MyClient {
    
        private static final Logger log = LoggerFactory.getLogger(MyClient.class);
    
        public static void main(String[] args) throws Exception{
            // 1、创建客户端channel,并连接服务端
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress(8000));
            socketChannel.write(Charset.defaultCharset().encode("hello\n"));
            System.out.println("-------------------------------------");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    总结

    这篇文章的阅读绝对值得你的精心研读。Netty的基础建立在NIO之上,如果单纯的学习Netty,你可能只会看到一堆的API,而无法深入理解其背后的设计原则和工作机制。

    然而,本篇文章从设计理念到编码实践,详细剖析了Reactor模型,为Netty的学习铺平了道路。这不仅帮助你理解Netty的运作方式,更能让你洞察其背后的设计哲学,使你在学习时不仅知其然,更能知其所以然。因此,这篇文章对于深化你对Netty的理解,研究其内部工作原理,无疑具有极大的价值

  • 相关阅读:
    Python基础入门篇【42】--python中的内置库os与sys模块
    JPA 如何修改 联表查询返回的Map
    归并排序——
    如何使用ChatGPT辅助写论文、数据分析、AI绘图?【附学习资料】
    一般乘法器设计,verilog code
    unittest使用简述
    idea 创建spring boot工程
    欠酸洗,异物压入,斑迹和脏污 学习笔记
    【Gradle构件工具深度学习】
    武汉星起航:做亚马逊最常见的错误思想
  • 原文地址:https://blog.csdn.net/qq_43266723/article/details/134386657