• Condition底层源码


    在未查看过lock锁源码可以先观看lock锁源码先lock底层源码

    Synchronized方法内调用wait、notify、notifyAll思路

    public static void main(String[] args) throws InterruptedException {
        Object a = new Object();
        new Thread(() -> {
            synchronized (a) {
                System.out.println("线程C开始");
                try {
                    a.wait();
                System.out.println("线程C结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(0);
        new Thread(() -> {
            synchronized (a) {
                System.out.println("线程A开始");
                a.notify();
                System.out.println("线程A结束");
            }
        }).start();
        Thread.sleep(0);
        new Thread(() -> {
            synchronized (a) {
                System.out.println("线程b开始");
            }
        }).start();
    // 线程C开始
    // 线程A开始
    // 线程A结束
    // 线程C结束
    // 线程b开始
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    Condition的原理
    通用的wait方法和notify方法在lock锁里面调用后是不会释放锁资源的,它只在Synchronized方法内会释放锁。

    public class ConditionDemeNotify implements Runnable {
        private Lock lock;
        private Condition condition;
    
        public ConditionDemeNotify(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin - ConditionDemeNotify");
            lock.lock(); //synchronized(lock)
            try {
                condition.signal(); //让当前线程唤醒 Object.notify(); //因为任何对象都会 有monitor
                System.out.println("end - ConditionDemeNotify");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    public class ConditionDemoWait implements Runnable {
    
        private Lock lock;
        private Condition condition;
    
        public ConditionDemoWait(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
            lock.notify();
        }
    
        @Override
        public void run() {
            System.out.println("begin - ConditionDemoWait");
            lock.lock();
            try {
                condition.await(); //让当前线程阻塞,Object.wait();
                System.out.println("end - ConditionDemoWait");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        ConditionDemeNotify conditionDemeNotify = new ConditionDemeNotify(lock,condition);
        ConditionDemoWait conditionDemoWait = new ConditionDemoWait(lock,condition);
        new Thread(conditionDemoWait).start();
        new Thread(conditionDemeNotify).start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    Condition.wait思路
    1、添加进AQS等待队列当中
    2、释放锁
    3、阻塞
    4、唤醒后,重新抢锁
    5、处理interupt()的中断响应

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 当前方法是因为抢锁状态所以不需要CAS操作进行加锁操作
        Node node = addConditionWaiter();    // 添加进AQS等待队列当中
        int savedState = fullyRelease(node);    // 完整的释放锁(考虑重入问题)
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {    // 校验是否需要等待
            LockSupport.park(this);    // 休眠等待唤醒
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)    // 抢锁
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    // 添加进AQS等待队列当中
    private Node addConditionWaiter() {
        Node t = lastWaiter; // 条件队列的最后一个节点
        if (t != null && t.waitStatus != Node.CONDITION) {    // 如果最后一个Node节点不是等待队列状态-2
            unlinkCancelledWaiters();    // 
            t = lastWaiter;
        }
        // 将当前线程添加进AQS等待队列当中
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    /**
     * 从条件队列中取消已取消服务节点的链接。仅在持有锁时调用。当条件等待期间发生取消时,以及当lastWaiter被取消时插入新的waiter时,
     * 将调用此函数。在没有信号的情况下,需要使用这种方法来避免垃圾保留。因此,即使它可能需要完整的遍历,它也只在没有信号的情况下发生超时或取消时才会发挥作用。
     * 它遍历所有节点,而不是停在一个特定的目标,以解除所有指向垃圾节点的指针的链接,而不需要在取消风暴期间多次重新遍历
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    
    // 完整的释放锁(考虑重入问题)
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();    // 获取当前线程重入次数
            if (release(savedState)) {    // 释放锁资源
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    
    // 校验是否需要休眠
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)    //    当前线程node节点如果已经移除了AQS等待队列当中就不需要等待休眠等待
            return false;
        if (node.next != null) // 如果有后继,它必须在队列中
            return true;
        return findNodeFromTail(node);
    }
    
    // 如果当前node节点在AQS等待队列当中就返回true 休眠等待
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    Condition.signal思路
    1、要把被阻塞的线程,先唤醒(signal、signalAll)
    2、把等待队列中被唤醒的线程转移到AQS队列中

    // 唤醒
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)    // 如果AQS队列当中第一个node
            doSignal(first);
    }
    // 删除和传输节点,直到到达非取消的1或null。从signal中分离出来 唤醒等待队列中的一个线程
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)    // 
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&    // 将等待AQS队列当中node节点迁移到同步队列当中
                 (first = firstWaiter) != null);
    }
    // 将节点从条件队列传输到同步队列。如果成功返回true ,如果成功传输,则为True(否则在信号之前取消节点)
    final boolean transferForSignal(Node node) {
          // 修改node状态为同步等待状态0,如果失败着返回错误
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);    // 迁移同步队列当中
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);    // 唤醒
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Condition的实际应用
    1、实现阻塞队列(业务组件)
    2、在线程池中会用到阻塞队列
    3、生产者消费者
    4、流量缓冲

    J.U.C 中的阻塞队列
    1、ArrayBlockingQueue 基于数组结构
    2、LinkedBlockingQueue 基于链表结构
    3、PriorityBlcokingQueue 基于优先级队列
    4、DelayQueue 允许延时执行的队列
    5、SynchronousQueue 没有任何存储结构的的队列

  • 相关阅读:
    保边滤波之引导滤波与领域转换滤波
    一篇短小精悍的文章让你彻底明白KMP算法中next数组的原理
    算法系列--递归
    portswigger JWT attacks
    单片机C语言实例:18、LCD1602液晶显示
    【自动化测试】selenium工具
    14.FreeRTOS 消息缓存 Message Buffer
    html5学习笔记18-web存储、web sql、web worker
    【C++】红黑树的模拟实现
    阿里云服务器安全组开放指定端口无法访问问题记录
  • 原文地址:https://blog.csdn.net/weixin_46919552/article/details/125543786