• java并发编程学习三——wait/notify与park/unpark


    一、wait/notify

    wait、notify、notifyAll都是Object中的方法,工作原理如图
    在这里插入图片描述

    • 某个线程需要的资源没达到条件,调用wait方法就进入WaitSet等待,不占用CPU时间
    • Owner线程执行临界区代码调用notify/notifyAll,就会唤醒WaitSet中的线程
    • 被唤醒的线程进入EntryList阻塞,竞争获得锁才能再进入临界区
    • 如果条件还是没到到就再次调用wait方法就进入WaitSet等待,否则就执行完释放锁。

    1.1 api的使用

    • obj.wait(),让进入Object监视器的线程进入watiSet等待
    • obj.notify(),在watiSet中挑一个正在等待的线程唤醒
    • obj.notifyAll(),唤醒waitSet中所有等待的线程

    注意:它们都是Object中用于线程协作的方法,必须要获得此对象锁之后,才能调用此对象的这些方法。
    示例代码:

    public class WaitNotify {
        static Object obj = new Object();
    
        public static void main(String[] args) {
            new Thread(() -> {
                synchronized (obj) {
                    try {
                        obj.wait();
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
    
            new Thread(() -> {
                synchronized (obj) {
                    System.out.println(Thread.currentThread().getName());
                    obj.notify();
                    // obj.notifyAll();
                }
            }, "t2").start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    1.2 wait(long n)与sleep(long n)

    相同点:

    • 都会使线程进入TIMED_WAITING状态

    不同点:

    • sleep是Thread的静态方法;wait定义在Object,那么所有对象都有该方法
    • sleep不需要和synchronized配合使用,wait必须要在synchronized代码块里面调用
    • sleep不会释放对象锁,wait会释放对象锁

    1.3 正确使用方式

    1.3.1 基本方式

    synchronized(obj){
    	while(条件不成立){
    		obj.wait();
    	}
    	//条件成立之后的逻辑
    }
    
    synchronized(obj){
    	obj.notifyAll();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这样写的原因是使用notify不确定唤醒哪个线程,notifyAll唤醒所有线程,while的写法如果条件不满足可以再次进入等待。
    示例代码:

    public class WaitNotify1 {
        static Object obj = new Object();
        static boolean wakeUp = false;
        static boolean beFull = false;
        public static void main(String[] args) throws InterruptedException {
    
            new Thread(() -> {
                synchronized (obj) {
                    try {
                        //条件不成立,进入等待
                        while (!wakeUp) {
                            obj.wait();
                            System.out.println(Thread.currentThread().getName() + "继续等待");
                        }
                        //结束等待
                        System.out.println(Thread.currentThread().getName() + "结束等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
    
            new Thread(() -> {
                synchronized (obj) {
                    try {
                        while (!beFull) {
                            obj.wait();
                            System.out.println(Thread.currentThread().getName() + "继续等待");
                        }
                        //结束等待
                        System.out.println(Thread.currentThread().getName() + "结束等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            }, "t2").start();
    
            Thread.sleep(50);
    
            synchronized (obj) {
                beFull=true;
                obj.notifyAll();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    1.3.2 保护性暂停模式

    保护性暂定是一个线程等待另一个线程结果时使用的一种同步模式(Guarded Suspension),如图

    在这里插入图片描述
    t1线程要等待t2线程的结果,增加一个GuardedObject来协调它们的运行顺序,这种模式适用于将一个结果从一个线程传递到另一个线程。如果有多个结果需要传递可以使用消息队列类实现。
    示例代码:

    public class WaitNotify2 {
        public static void main(String[] args) throws InterruptedException {
            GuardedObject guardedObject = new GuardedObject();
            new Thread(() -> {
                synchronized (guardedObject) {
                    Object response = guardedObject.get(2000);
                    System.out.println(response);
                }
            }, "t1").start();
            Thread.sleep(50);
            new Thread(() -> {
                synchronized (guardedObject) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "执行业务");
                        guardedObject.wait(3000);
                        guardedObject.set(new Object());
                        //虚假唤醒
    //                    guardedObject.wait(1000);
    //                    guardedObject.set(null);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t2").start();
        }
    
    }
    
    class GuardedObject {
        private Object response;
    
        //获取结果,超时退出
        public Object get(int timeout) {
            synchronized (this) {
                //开始时间
                long begin = System.currentTimeMillis();
                //已经等待的时间
                long passTime = 0;
    
                while (response == null) {
                    //剩余等待时间
                    long waitTime = timeout - passTime;
                    if (waitTime <= 0) {
                        break;
                    }
                    try {
                        System.out.println(Thread.currentThread().getName() + "等待...");
                        this.wait(waitTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    passTime = System.currentTimeMillis() - begin;
                }
                return response;
            }
    
        }
    
        //产生结果
        public synchronized void set(Object response) {
            this.response = response;
            this.notifyAll();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    1.3.3 消息队列模式

    消息队列模式中,消息队列用来平衡生产线程与消费线程的资源,生产线程专注产生结果数据,消费线程专注处理结果数据,达到解耦的目的。JDK中各种阻塞队列采用的就是这种模式,如下图
    在这里插入图片描述
    实现代码:

    public class WaitNotify3 {
        public static void main(String[] args) throws InterruptedException {
            MessageQueue messageQueue = new MessageQueue(2);
            for (int i = 0; i < 3; i++) {
                final int id = i;
                new Thread(() -> messageQueue.put(new Message(id, "值" + id)), "生产线程" + i).start();
            }
    
            Thread.sleep(1000);
    
            new Thread(() -> {
                while (true){
                    Message take = messageQueue.take();
                }
            }, "消费线程").start();
        }
    }
    
    class MessageQueue {
        LinkedList<Message> queue = new LinkedList<>();
        private int capacity;
    
        public MessageQueue(int capacity) {
            this.capacity = capacity;
        }
    
        //消费消息
        public Message take() {
            synchronized (queue) {
                //队列为空,进入等待
                while (queue.isEmpty()) {
                    try {
                        System.out.println("队列为空,消费者线程进入等待!");
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Message message = queue.removeFirst();
                System.out.println("消费消息" + message.toString());
                queue.notifyAll();
                return message;
            }
        }
    
        //生产消息
        public void put(Message message) {
            synchronized (queue) {
            	//队列已满,进入等待	
                while (queue.size() == capacity) {
                    try {
                        System.out.println("队列已满,生产者线程进入等待!");
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.addLast(message);
                System.out.println("生产消息" + message.toString());
                queue.notifyAll();
            }
    
        }
    }
    
    final class Message {
        final private int id;
        final private Object message;
    
        public int getId() {
            return id;
        }
    
        public Object getMessage() {
            return message;
        }
    
        public Message(int id, Object message) {
            this.id = id;
            this.message = message;
        }
    
        @Override
        public String toString() {
            return "Message{" +
                    "id=" + id +
                    ", message=" + message +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    1.4 join源码

    join最终调用如下代码,该代码来自java源码

    //主线程中调用t1.join(),t1为子线程
    //主线程得到t1对象的锁之后进入(线程对象同时作为锁对象)
     public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
            	//线程存活,主线程继续等待(虚假唤醒,其他线程调用t1.notifyAll())
                while (isAlive()) {
                	//等价于this.wait(0),阻塞的是主线程,表示无限等待
                    wait(0);
                }
            } else {
            	//线程存活,主线程继续等待(虚假唤醒)
                while (isAlive()) {
                	//使用的正是保护性暂停,每次等待剩余时间
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    观察源码,可以看出,join的实现依赖的是wait。

    二、park/unpark

    2.1 与wait/notify的区别

    park/unpark也可以用来暂停和恢复线程的运行,与wait/notify的区别:

    • park/unpark不需要Object的Monitor锁支持,不用和synchronized配合使用
    • park/unpark以线程为单位来控制暂停和恢复,可以更精确。而wait/notify是以锁为单位,只能随机唤醒一个正在等待的线程
    • unpark可以在park之前执行依然有效(park之后不暂停,继续执行),notify在wait之前执行则不起作用

    2.2 原理

    LockSupport.park();调用的是 UNSAFE.park,JVM底层为每个线程维护一个Parker对象。
    在这里插入图片描述
    1.UNSAFE.park调用
    2.检查_counter,_counter为0则获取_mutex锁
    3.线程进入_cond等待
    4.线程将_counter置为0

    LockSupport.unpark(thread);调用的是UNSAFE.unpark(thread);
    在这里插入图片描述
    1.将_counter置为1
    2.拿到_mutex锁,唤醒线程
    3.继续运行线程
    4.将_counter置为0

    假如unpark在park之前执行
    在这里插入图片描述
    1.unpark将_counter置为1,线程正在运行,不需要唤醒
    2.UNSAFE.park调用
    3.检查_counter,_counter为1,现在不暂停,继续运行
    4.将_counter置为0

    注意:unpark需在线程启动之后(调用start方法)生效,不管unpark执行多少次,只能影响一次park。
    代码示例

    public class ParkUnPark {
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "开始执行");
                LockSupport.park();
                System.out.println(Thread.currentThread().getName() + "第一次被唤醒");
                LockSupport.park();
                //不会被执行
                System.out.println(Thread.currentThread().getName() + "第二次被唤醒");
            }, "t1");
            t1.start();
            //unpark两次,只有第一次被唤醒
            LockSupport.unpark(t1);
            LockSupport.unpark(t1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    三、线程状态切换

    经过前面线程相关的方法讲解,再来看看JVM线程状态转换过程。下面是状态图,前面讲到过,再次分析一下
    在这里插入图片描述
    首先,new一个线程对象,进入NEW状态,接着
    1.调用线程对象的start方法,进入RUANABLE
    2.调用锁对象的wait进入WAITING;调用notify、notifyAll或者线程被打断调用interrupt,分两种情况

    • 有锁,进入BLOCKED竞争锁
    • 无锁,进入RUNNABLE等待CPU调度

    3.主线程调用t.join,主线程进入WAITING,主线程在t线程关联的监视器的WaitSet中等待。t线程运行结束,或者调用了主线程的interrupt方法,从WAITING进入RUNNABLE
    4.调用LockSupport.park方法,线程进入WAITING;调用LockSupport.unpark(指定线程),指定线程进入RUNNABLE;指定线程调用interrupt方法也可以进入RUNNABLE,线程被标记打断之后park方法失效,无法再次暂定线程。
    5.调用锁对象的wait(n),进入TIMED_WAITING;与wait方法类似,只是多了超时自动唤醒
    6.主线程调用t.join(n),进入TIMED_WAITING;与join类似
    7.调用线程静态方法sleep(n),进入TIMED_WAITING;睡眠时间到之后变为RUNNABLE,sleep不释放锁,不用区分有锁和无锁
    8.调用LockSupport.parkNanos(n)或者LockSupport.parkUntil(n)方法进入TIMED_WAITING,与park类似
    9.线程在进入synchronized代码块时竞争锁失败状态变为BLOCKED,进入监视器EntryList等待;被唤醒重新获取锁成功,状态变为RUNNABLE
    10.线程执行完毕,状态变为TERMINATED

  • 相关阅读:
    关于el-date-picker点击清空参数变为null的问题
    理解线程池源码 【C++】面试高频考点
    我的创作纪念日-成为CSDN创作者的 第4096天
    正向传播和反向传播
    MATLAB算法实战应用案例精讲-【智能优化算法】蛇优化算法-SO(附MATLAB代码)
    让图像恢复的baselines简单点 [2022ECCV】
    521. 最长特殊序列 Ⅰ
    如何利用 AI 写一本书并实现被动收入
    Spring Boot学习笔记(1)
    题目 1060: 二级C语言-同因查找
  • 原文地址:https://blog.csdn.net/yx444535180/article/details/126408867