• 学习 Java 的多线程开发


    在介绍线程 Thread 之前,我们必须先搞清楚程序 Program 和进程 Process 这两个概念。

    • 程序 Program:一组代码的集合,用于解决特定的问题。相当于面向对象概念中的类 Class。
    • 进程 Process:由程序生成的执行实例,一个程序可以生成多个进程,并同时执行。相当于面向对象概念中的对象 Object。每个进程由以下两部分组成:
      • 一块内存空间 Memory Space:相当于对象的变量,不同进程的内存空间也不同,彼此无法看到对方的内存空间。
      • 一个或多个线程 Thread:线程表示从某个起始点(例如 main)开始,到目前为止所有函数的调用路径,以及这些调用路径上所使用的局部变量。当然,除了在主存储器中记录程序的执行状态之外,CPU 内部的寄存器(如程序计数器、堆栈指针、程序状态字等)也需要一起记录。因此线程又由以下两项组成:
        • 栈 Stack,记录函数调用路径以及这些函数使用的局部变量;
        • 当前 CPU 的状态。

    根据上述描述,我们总结出线程的重点如下:

    • 一个进程可以有多个线程。
    • 同一个进程内的线程使用相同的内存空间,但这些线程各自拥有自己的栈。换句话说,线程可以通过引用访问相同的对象,但局部变量是各自独立的。
    • 操作系统会根据线程的优先级以及已经使用的 CPU 时间,在不同的线程之间进行切换,使得每个线程都有机会被执行。

    如何生成线程

    Java 使用java.lang.Thread类来表示线程。Thread 类有两个构造函数:

    • Thread()
    • Thread(Runnable)

    第一个构造函数没有参数,第二个构造函数需要一个Runnable对象作为参数。Runnable是一个接口,在java.lang中定义,其声明为:

    public interface Runnable {
    	public void run();
    }
    
    • 1
    • 2
    • 3

    使用Thread()生成的线程,其入口是Thread类中的run()方法;使用Thread(Runnable)生成的线程,其入口是Runnable对象中的run()方法。当run()方法执行完毕时,该线程也就结束了,与main()方法结束具有相同的效果。使用示例如下:

    public class ThreadExample1 extends Thread {
        @Override
        public void run() { // 重写 Thread 的 run()
            System.out.println("这是线程的起点。");
    
            for (; ; ) { // 无限循环打印消息
                System.out.println("用户创建的线程");
            }
        }
    
        public static void main(String[] argv) {
            Thread t = new ThreadExample1(); // 生成线程对象
            t.start(); // 开始执行 t.run()
            
            for (; ; ) {
                System.out.println("主线程");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    运行上述程序后,屏幕上会不断打印出"用户创建的线程"或"主线程"。使用Runnable的写法如下:

    public class ThreadExample2 implements Runnable {
        @Override
        public void run() { // 实现 Runnable 的 run()
            System.out.println("这是线程的起点。");
            
            for (; ; ) { // 无限循环打印消息
                System.out.println("用户创建的线程");
            }
        }
    
        public static void main(String[] argv) {
            Thread t = new Thread(new ThreadExample2()); // 生成线程对象
            t.start(); // 开始执行 Runnable.run();
            
            for (; ; ) {
                System.out.println("主线程");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    线程的优先级与影响资源的相关方法

    Thread.setPriority(int)可以设置Thread的优先级,数字越大优先级越高。Thread定义了三个相关的常量:

    public static final int MAX_PRIORITY 10
    public static final int MIN_PRIORITY 1
    public static final int NORM_PRIORITY 5
    
    • 1
    • 2
    • 3

    要提醒的是,优先级高的 Thread 其占有 CPU 的机会比较高,但优先级低的也都会有机会被执行到。其他关于 Thread 执行的方法有:

    • yield():先让给别的 Thread 执行
    • sleep(int time):休息 time 毫秒(1/1000 秒)
    • join():调用ThreadA.join()的线程会等到 ThreadA 结束后,才能继续执行

    你可以执行下面的程序,看看yield()的效果。

    public class ThreadExample1 extends Thread {
        @Override
        public void run() { // overwrite Thread's run()
            System.out.println("Here is the starting point of Thread.");
            
            for (; ; ) { // infinite loop to print message
                System.out.println("用户创建的线程");
                yield();
            }
        }
    
        public static void main(String[] argv) {
            Thread t = new ThreadExample1(); // 生成 Thread 对象
            t.start(); // 开始执行 t.run()
            
            for (; ; ) {
                System.out.println("主线程");
                yield();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    观看join()的效果

    public class JoinExample extends Thread {
        String myId;
    
        public JoinExample(String id) {
            myId = id;
        }
    
        @Override
        public void run() { // overwrite Thread's run()
            for (int i = 0; i < 500; i++) 
                System.out.println(myId + " Thread");
        }
    
        public static void main(String[] argv) {
            Thread t1 = new JoinExample("T1"); // 生成 Thread 对象
            Thread t2 = new JoinExample("T2"); // 生成 Thread 对象
            t1.start(); // 开始执行t 1.run()
            t2.start();
            
            try {
                t1.join(); // 等待t1结束
                t2.join(); // 等待t2结束
            } catch (InterruptedException e) {
            }
            
            for (int i = 0; i < 5; i++) 
                System.out.println("主线程");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    观看sleep()的效果。

    public class SleepExample extends Thread {
        String myId;
    
        public SleepExample(String id) {
            myId = id;
        }
    
        @Override
        public void run() { // overwrite Thread's run()
            for (int i = 0; i < 500; i++) {
                System.out.println(myId + " Thread");
                
                try {
                    sleep(100);
                } catch (InterruptedException e) {
                }
            }
        }
    
        public static void main(String[] argv) {
            Thread t1 = new SleepExample("T1"); // 生成 Thread 对象
            Thread t2 = new SleepExample("T2"); // 生成 Thread 对象
            t1.start(); // 开始执行 t1.run()
            t2.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    关键时刻(Critical Section)的保护措施

    如果设计者没有提供相应的保护机制的话,那么将会由操作系统来决定 Thread 对 CPU 的控制权,也就是说 Thread 可能在其执行任何一条机器指令的时候,被操作系统取走 CPU 的控制权,并交给另一个 Thread。然而,真实世界中的某些动作是不可分割的,例如银行转帐 X 元由 A 帐户到 B 帐户,转帐前后这两个帐户的总金额必须相同,但以程序来实作时却无法用一条指令就完成,如转帐可能要写成下面的这一段程序代码

    if (A >= X) {
    	A = A - X; // 翻译成3个机器指令 LOAD A, SUB X, STORE A
    	B = B + X;
    }
    
    • 1
    • 2
    • 3
    • 4

    如果两个线程同时要存取 A、B 两账户进行转账,假设当 Thread 1 执行到SUBX后被中断,Thread 2 接手执行完成另一个转账要求,然后 Thread 1 继续执行未完成的动作,请问这两个转账动作正确吗?我们以A=1000, B=0分别转账 100、200圆来说明此结果。

    LOAD A // Thread 1,现在 A 还是 1000
    SUB 100 // Thread 1
    LOAD A // 假设此时 Thread 1 被中断,Thread 2 接管,因为 Thread 1 还没有执行 STORE A,所以变量 A 还是 1000
    SUB 200 // Thread 2
    STORE A // Thread 2,A=800
    LOAD B // Thread 2,B 现在是 0
    ADD 200 // Thread 2
    STORE B // B=200
    STORE A // Thread 1 拿回控制权,A=900
    LOAD B // Thread 1,B=200
    ADD 100 // Thread 1
    STORE B // B=300
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    你会发现执行完成后A=900B=300,也就是说银行平白损失了 200 圆。当然另外的执行顺序可能造成其他不正确的结果。我们把这问题再整理一下:

    1. 写程序时假设指令会循序执行
    2. 某些不可分割的动作,需要以多个机器指令来完成
    3. Thread 执行时可能在某个机器指令被中断
    4. 两个 Thread 可能执行同一段程序,存取同一个资料结构
    5. 这样就破坏了第1点的假设

    因此在编写多线程的程序时候必须考虑这种特别的情况(又称为 Race Condition)。Java 解决的办法是,JVM 会在每个对象上放一把锁 Lock,然后程序设计者可以声明执行某一段程序(通常是用来访问共享数据结构的代码,又称为 Critical Section)时,必须拿到某对象的锁才行,这把锁同时最多只有一个线程可以拥有它。

    其实觉得"锁",获取"锁"的说法似乎不太符合中国人的思维,这个"锁"其实就是一个"令牌",有令牌就能进入,无令牌就不能进入。这么说来这个"锁"其实应该叫"钥匙"。当然这里应该还有一个概念,叫房间,这个钥匙是对应什么房间的(锁住什么东西,即锁住什么范围)。所以光叫"钥匙",似乎也不太准确。所以西方人的这个"锁",应该有两个概念: “锁对象”,就是我们说的"钥匙";“锁了什么范围”,就是这个锁锁了什么房间。

    public class Transfer extends Thread {
        public static Object lock = new Object();
        public static int A = 1000;
        public static int B = 0;
        private final int amount;
    
        public Transfer(int x) {
            amount = x;
        }
    
        @Override
        public void run() {
            synchronized (lock) { // 获取 lock,如果别的 thread A已获取,则当前这个 thread 会等到 thread A 释放该 lock
                if (A >= amount) {
                    A = A - amount;
                    B = B + amount;
                }
            } // 离开 synchronized 区块后,此 thread会 自动释放 lock
        }
    
        public static void main(String[] argv) {
            Thread t1 = new Transfer(100);
            Thread t2 = new Transfer(200);
            t1.start();
            t2.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    除了synchronized(ref)的语法可以锁定ref指向的对象外,synchronized也可以用在方法前面,表示要锁定this对象才能执行该方法。以下是Queue结构的范例。

    public class Queue {
        private final Object[] data;
        private int size;
        private int head;
        private int tail;
    
        public Queue(int maxLen) {
            data = new Object[maxLen];
        }
    
        public synchronized Object deQueue() {
            Object tmp = data[head];
            data[head] = null;
            head = (head + 1) % data.length;
            size--;
            
            return tmp;
        }
    
        public synchronized void enQueue(Object c) {
            data[tail++] = c;
            tail %= data.length;
            size++;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    虽然上面的程序正确无误,但并未考虑资源不足时该如何处理。例如Queue已经没有数据了,却还想拿出来;或是Queue里已经塞满了数据,使用者却还要放进去?我们当然可以使用Exception Handling的机制:

    public class Queue {
        private final Object[] data;
        private int size;
        private int head;
        private int tail;
    
        public Queue(int maxLen) {
            data = new Object[maxLen];
        }
    
        public synchronized Object deQueue() throws Exception {
            if (size == 0) 
                throw new Exception();
            
            Object tmp = data[head];
            data[head] = null;
            head = (head + 1) % data.length;
            size--;
            
            return tmp;
        }
    
        public synchronized void enQueue(Object c) throws Exception {
            if (size >= maxLen) 
                throw new Exception();
            
            data[tail++] = c;
            tail %= data.length;
            size++;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    但假设我们的执行环境是,某些 Thread 专门负责读取用户的需求,并把工作放到 Queue 里面,某些 Thread 则专门由 Queue 里抓取工作需求做进一步处理。这种架构的好处是,可以把慢速或不定速的输入(如通过网络读数据,连接速度可能差很多),和快速的处理分开,可使系统的反应速度更快,更节省资源。那么以 Exceptoin 来处理 Queue 空掉或爆掉的情况并不合适,因为使用 Queue 的人必须处理异常状况,并不断的消耗 CPU 资源:

    public class Getter extends Thread {
        Queue q;
    
        public Getter(Queue q) {
            this.q = q;
        }
    
        public void run() {
            for (; ; ) {
                try {
                    Object data = q.deQueue();
                    // processing
                } catch (Exception e) {
                    // 如果在这里进行 sleep,就会卡住了;但如果不 sleep,却会浪费 CPU 资源
                }
            }
        }
    }
    
    public class Putter extends Thread {
        Queue q;
    
        public Putter(Queue q) {
            this.q = q;
        }
    
        public void run() {
            for (; ; ) {
                try {
                    Object data = null;// get user request
                    q.enQueue(data);
                } catch (Exception e) {
                    // 如果在这里进行 sleep,就会卡住了;但如果不 sleep,却会浪费 CPU 资源
                }
            }
        }
    }
    
    public class Main {
        public static void main(String[] argv) {
            Queue q = new Queue(10);
            Getter r1 = new Getter(q);
            Getter r2 = new Getter(q);
            Putter w1 = new Putter(q);
            Putter w2 = new Putter(q);
    
            r1.start();
            r2.start();
            w1.start();
            w2.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    为了解决这类资源分配的问题,Java 对象提供了以下三个方法:

    • wait():使调用此方法的 Thread 进入 Blocking Mode,并设为等待该对象,调用wait()时,该 Thread 必须拥有该对象的 lock。Blocking Mode 下的 Thread 必须释放所有手中的 lock,并且无法使用 CPU。
    • notifyAll():让等待该对象的所有 Thread 进入Runnable Mode。
    • notify():让等待该对象的某一个 Thread 进入Runnable Mode。

    所谓 Runnable Mode 是指该 Thread 随时可由操作系统分配 CPU 资源。Blocking Mode 表示该 Thread 正在等待某个事件发生,操作系统不会让这种 Thread 取得 CPU 资源。前一个 Queue 的范例就可以写成:

    public class Queue {
        private final Object[] data;
        private int size;
        private int head;
        private int tail;
    
        public Queue(int maxLen) {
            data = new Object[maxLen];
        }
    
        public synchronized Object deQueue() {
            while (size == 0) { // 当执行到这里时,线程必须已经获取到锁并处于运行状态
                // 让当前线程等待该对象(进入睡眠模式)
                try {
                    wait(); // 进入睡眠模式,并释放所有锁
                } catch (Exception ex) {
                }
            }
    
            Object tmp = data[head];
            data[head] = null;
            head = (head + 1) % data.length;
    
            if (size == data.length) {
                // 唤醒所有等待该对象的线程
                notifyAll();
            }
    
            size--;
    
            return tmp;
        } // 释放锁
    
        public synchronized void enQueue(Object c) {
            while (size == data.length) { // 当执行到这里时,线程必须已经获取到锁并处于运行状态
                // 让当前线程等待该对象(进入睡眠模式)
                try {
                    wait(); // 进入睡眠模式,并释放所有锁
                } catch (Exception ex) {
                }
            }
    
            data[tail++] = c;
            tail %= data.length;
            size++;
    
            if (size == 1)
                // 唤醒所有等待该对象的线程
                notifyAll();
        }
    
    }
    
    public class ReaderWriter extends Thread {
        public static final int READER = 1;
        public static final int WRITER = 2;
        private Queue q;
        private int mode;
    
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
    
                if (mode == READER) q.deQueue();
                else if (mode == WRITER) q.enQueue(i);
            }
        }
    
        public ReaderWriter(Queue q, int mode) {
            this.q = q;
            this.mode = mode;
        }
    
        public static void main(String[] args) {
            Queue q = new Queue(5);
            ReaderWriter r1, r2, w1, w2;
            (w1 = new ReaderWriter(q, WRITER)).start();
            (w2 = new ReaderWriter(q, WRITER)).start();
            (r1 = new ReaderWriter(q, READER)).start();
            (r2 = new ReaderWriter(q, READER)).start();
    
            try {
                w1.join(); // 等待w1线程完成
                w2.join(); // 等待w2线程完成
                r1.join(); // 等待r1线程完成
                r2.join(); // 等待r2线程完成
            } catch (InterruptedException epp) {
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    多读者-写者监视器

    上一节的队列数据结构,无论是enQueue()还是deQueue()都会修改队列的内容。而在许多应用里,数据结构可以允许多个读取者和一个写入者同时操作。本节举出几个不同的例子,说明多个读者-写者(Reader-Writer)时的可能调度方法。

    单个读者-写者(Single Reader-Writer,)只允许一个线程同时访问。

    public class SingleReaderWriter {
        int n; // number of reader and write, 0 or 1
        public synchronized void startReading() throws InterruptedException {
            while (n != 0) 
                wait();
            
            n = 1;
        }
        
        public synchronized void stopReading() {
            n = 0;
            notify();
        }
        
        public synchronized void startWriting() throws InterruptedException {
            while (n != 0) 
                wait();
            
            n = 1;
        }
        
        public synchronized void stopWriting() {
            n = 0;
            notify();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这是一个使用示例,程序能否正确执行取决于调用正确的startstop

    public class WriterThread extends Thread {
        SingleReaderWriter srw;
        
        public WriterThread(SingleReaderWriter srw) {
            this.srw = srw;
        }
        
        public void run() {
            startWring();
            // 实际的逻辑……
            stopWriting();
        }
    }
    
    public class ReaderThread extends Thread {
        SingleReaderWriter srw;
        
        public ReaderThread(SingleReaderWriter srw) {
            this.srw = srw;
        }
        
        public void run() {
            startReading();
            // 实际的逻辑……
            stopReading();
        }
    }
    
    public class Test {
        public static void main(String[] argv) {
            SingleReaderWriter srw = new SingleReaderWriter;
            // 创建四个线程
            (new WriterThread(srw)).start();
            (new WriterThread(srw)).start();
            (new ReaderThread(srw)).start();
            (new ReaderThread(srw)).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    其他可能的策略实现如下。

    Reader优先:

    public class ReadersPreferredMonitor {
        int nr; // 当前正在读取的线程数量, nr >= 0
        int nw; // 当前正在写入的线程数量, 只能为0或1
        int nrtotal; // 正在读取或等待读取的线程数量, nrtotal >= nr
        int nwtotal; // 正在写入或等待写入的线程数量
        public synchronized void startReading() throws InterruptedException {
            nrtotal++; // 有一个想要读取的线程加入
            
            while (nw != 0)  // 有线程正在写入
                wait();
            
            nr++; // 正在读取的线程数量加一
        }
        
        public synchronized void startWriting() throws InterruptedException {
            nwtotal++; // 有一个想要写入的线程加入
            
            while (nrtotal+nw != 0)  // 只要有线程想要读取或有线程正在写入,就等待
                wait();
            
            nw = 1;
        }
        
        public synchronized void stopReading() {
            nr--; // 正在读取的线程数量减一
            nrtotal--; // 有想要读取的线程数量减一
            
            if (nrtotal == 0)  // 如果没有线程需要读取,则唤醒想要写入的线程
                notify();
        }
        
        public synchronized void stopWriting() {
            nw = 0; // 没有线程正在写入
            nwtotal--; // 有想要写入的线程数量减一
            notifyAll(); // 唤醒所有想要读取或写入的线程
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    Writer优先:

    public class WritersPreferredMonitor {
        int nr; // 当前正在读取的线程数量, nr >= 0
        int nw; // 当前正在写入的线程数量, 只能为0或1
        int nrtotal; // 正在读取或等待读取的线程数量, nrtotal >= nr
        int nwtotal; // 正在写入或等待写入的线程数量
        
        public synchronized void startReading() throws InterruptedException {
            nrtotal++; // 有一个想要读取的线程加入
            
            while (nwtotal != 0)  // 还有线程想要写入
                wait();
            
            nr++; // 正在读取的线程数量加一
        }
        
        public synchronized void startWriting() throws InterruptedException {
            nwtotal++; // 有一个想要写入的线程加入
            
            while (nr+nw != 0)  // 有线程正在读取或有线程正在写入
                wait();
            
            nw = 1;
        }
        
        public synchronized void stopReading() {
            nr--; // 正在读取的线程数量减一
            nrtotal--; // 有想要读取的线程数量减一
            if (nr == 0)  // 如果没有正在读取的线程,则唤醒所有线程(包括想要写入的)
                notifyAll();
        }
        
        public synchronized void stopWriting() {
            nw = 0; // 没有线程正在写入
            nwtotal--; // 有想要写入的线程数量减一
            notifyAll(); // 唤醒所有想要读取或写入的线程
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    ReaderWriter交互执行:

    public class AlternatingReadersWritersMonitor {
        int[] nr = new int[2]; // 当前正在读取的线程数量
        int thisBatch; // 当前正在读取的批次号(0或1)
        int nextBatch = 1; // 等待读取的批次号(始终为1-thisBatch)
        int nw; // 当前正在写入的线程数量(0或1)
        int nwtotal; // 正在写入或等待写入的线程数量
        
        public synchronized void startReading() throws InterruptedException {
            if (nwtotal == 0)  // 没有线程要写入,将所有的 reader 放到当前处理的批次
                nr[thisBatch]++;
             else {
                nr[nextBatch]++;
                int myBatch = nextBatch;
                
                while (thisBatch != myBatch) 
                    wait();
            }
        }
        
        public synchronized void stopReading() {
            nr[thisBatch]--;
            if (nr[thisBatch] == 0)  // 当前批次的 reader 都读完了,找下一个 writer
                notifyAll();
        }
        
        public synchronized void startWriting() throws InterruptedException {
            nwtotal++;
            
            while (nr[thisBatch]+nw != 0)  // 当前批次还没读完,或有线程正在写入
                wait();
            
            nw = 1;
        }
        
        public synchronized void stopWriting() {
            nw = 0;
            nwtotal--;
            int tmp = thisBatch; // 交换下一个要读取的批次
            thisBatch = nextBatch;
            nextBatch = tmp;
            notifyAll();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
  • 相关阅读:
    大语言模型LLM分布式框架:PyTorch Lightning框架(LLM系列14)
    【网络奇幻之旅】那年我与互联网的邂逅
    基于MATLAB的图片文字提取和识别——算法复现
    Vue前端开发中的输入限制与输入规则探究
    Eureka Series : MSVC puTTY + psFTP Dev
    git操作说明
    云原生API网关全生命周期管理Apache APISIX探究实操
    工作记录:vue-grid-layout 修改 margin 导致 item 高度剧烈变化
    Elasticsearch搜索辅助功能解析(十)
    Android开发笔记——快速入门(全局大喇叭)
  • 原文地址:https://blog.csdn.net/zhangxin09/article/details/132724982