• JUC——AQS


    AbstractQueuedSynchronizer是Java的并发基础,想学好Java的并发,还需要先从AQS开始学起,本节参考网络知识,学习AQS的实现原理;

    1 CLH队列

    有关CLH知识可以参考:《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》

    AQS是CLH锁的一个变种,了解CLH原理对学习AQS有一定理解上的帮助,CLH队列属于自旋锁的一种,先来看一下CLH队列锁的特点:

    1. CLH是一个自旋锁公平锁,属于FCFS机制,内部维护一个队列串行处理;
    2. 竞争线程在前驱节点变量上自旋,如果不满足条件则一直自旋;
    3. CLH队列按请求顺序处理,不支持锁重入;

    参考网络中对CLH队列锁的一种实现:

    public class ClhSpinLock implements Lock{
        private final ThreadLocal<Node> prev;
        private final ThreadLocal<Node> node;
        private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
    
        public ClhSpinLock() {
            this.node = new ThreadLocal<Node>() {
                protected Node initialValue() {
                    return new Node();
                }
            };
    
            this.prev = new ThreadLocal<Node>() {
                protected Node initialValue() {
                    return null;
                }
            };
        }
    
        public void lock() {
            final Node node = this.node.get();
            node.locked = true;
            Node pred = this.tail.getAndSet(node);
            this.prev.set(pred);
            // 自旋
            while (pred.locked);
        }
    
        public void unlock() {
            final Node node = this.node.get();
            node.locked = false;
            this.node.set(this.prev.get());
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public boolean tryLock() {
            // TODO Auto-generated method stub
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            // TODO Auto-generated method stub
            return false;
        }
    
        @Override
        public Condition newCondition() {
            // TODO Auto-generated method stub
            return null;
        }
        
        private static class Node {
            private volatile boolean locked;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    上述代码中使用到了ThreadLocal,可以方便的在加锁时将当前Node存放ThreadLocal,在释放锁时,只需要从ThreadLocal获取当前线程绑定的节点;

    1.1 加锁分析

    通过图解的方式,分析加锁的过程:
    在这里插入图片描述
    通过CAS设置tail节点,如果自旋的前驱节点的locked如果是false,则认为加锁成功,上面的实现方式是不支持锁重入功能的,但是在AQS实现中支持公平/非公平和可重入;

    1.2 锁释放

    在这里插入图片描述
    当前线程释放锁时,首先将locked = false表示下个节点可以获取锁并且让当前节点指向前驱节点;

    2 AQS使用

    在开始AQS源码之前,先来通过AQS实现一个互斥公平锁:

    public class CustomLock implements Lock {
    
        private final Sync sync;
        public static class Sync extends AbstractQueuedSynchronizer{
    
            @Override
            protected boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 这里的c为什么先获取出来,为什么不是每次使用的时候都是getState
                // 个人理解:它是在获取锁之前的状态,如果在做CAS的时候,拿c去做,结果失败说明被其他线程做了修改
                // 如果是compareAndSetState(getState(), acquires),那state即使其他线程做了更新,这里也会CAS成功;
                int c = getState();
                if (c == 0) {
                    // 如果cas失败,会返回false,加锁失败;
                    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 锁重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    // 这里setState为什么不是CAS去修改;
                    // 如果是锁重入的话,说明当前线程已经加到了锁,不会有其他的线程对state做修改,所以这里不会有并发问题;
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int releases) {
                // 释放锁时,当前线程已经持有锁,所以不需要防止并发;
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 如果c!=0说明还有锁重入
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            public Condition newCondition(){
                return new ConditionObject();
            }
            protected final boolean isHeldExclusively() {
        		return getExclusiveOwnerThread() == Thread.currentThread();
    		}
        }
    
        public CustomLock() {
            sync = new Sync();
        }
    
        // 加锁直到获取锁或者当前锁有权利获取锁时被中断;
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        // 可被中断,如果在获取锁的过程中,没有获取需要等待,则可以通过thread.interrupt中断,响应InterruptedException异常
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        // 尝试加锁,如果获取不到,则返回false
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1,unit.toNanos(timeout));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    可见通过AQS实现一个锁是非常的简单方便,AQS还可以实现共享锁,下面的源码分析中会介绍到;
    实现AQS抽象类时,如果是独占模式,只需要实现:

    tryAcquire
    tryRelease
    isHeldExclusively

    如果是共享模式,只需要实现:

    tryAcquireShared
    tryReleaseShared
    isHeldExclusively

    3 AQS源码分析

    3.1 Node

    Node节点主要包含节点的状态,前驱和后继节点,如果有使用Condition还会使用到nextWaiter,在下面会对这些操作进行详细的分析:

    static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;
    
        // 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
        static final int CANCELLED =  1;
        // 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
        static final int SIGNAL    = -1;
        // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
        static final int CONDITION = -2;
        // 当前节点获取到锁的信息需要传递给后继节点,共享锁模式使用该值
        static final int PROPAGATE = -3;
    
        volatile int waitStatus;
    
        // 前驱节点
        volatile Node prev;
    
        // 后继节点
        volatile Node next;
    
        // 节点绑定的线程
        volatile Thread thread;
    
        // Condition条件队列,当使用Condition时它表示后继节点
        Node nextWaiter;
    
        // 判断是否是共享锁
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        // 获取同步队列的前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // Used to establish initial head or SHARED marker
        }
    
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    3.2 AQS的CLH队列

    AQS中的CLH队列采用双向链表,AQS中维护着head和tail;在AQS中还需要配合着state成员字段来使用;
    当只有一个线程加锁时,其实是不需要构建CLH队列;
    在这里插入图片描述

    3.3 AQS独占锁

    独占锁主要涉及的方法有:
    加锁:

    • acquire()
    • tryAcquire()
    • tryAcquireNanos()
    • acquireInterruptibly()

    释放锁:

    • release()
    • tryRelease()

    3.3.1 tryAcquire

    tryAcquire方法在AQS内部是空实现,需要实现方自定义实现逻辑,以上面的自定义实现来说,我们认为getState()==0时,没有线程竞争锁,可以直接加锁成功;

    tryAcquire只做一次尝试,如果获取锁失败,则直接返回false;
    以ReentrantLock为例,内部实现tryAcquire时有公平锁和非公平锁之后;

    3.3.2 acquire

    独占模式获取,忽略中断。 通过调用至少一次tryAcquire(int)实现,成功返回。 否则线程排队;这个过程中不会被线程中断而结果,只有获取到资源之后才会判断Thread.interrupted(),如果需要中断则再调用Thread.currentThread().interrupt()

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
        	// addWaiter 向CLH队列添加节点;
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
            // 当前节点拿到锁,但是被中断了,则调用线程的  Thread.currentThread().interrupt()
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    首先会进行一次tryAcquire,如果失败,则将当前线程封装成Node节点添加到CLH队列;

    private Node addWaiter(Node mode) {
    	// 独占模式节点
        Node node = new Node(Thread.currentThread(), mode);
        // 如果tail为空null,尝试向队尾添加node,这一步CAS尝试不需要挂起线程;
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
       	// 如果添加失败,进入到enq
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果tail为空,则初始化一个节点,head和tail分别指向它;然后再循环一轮进入else
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            // 如果CLH队列不为空,则向队尾添加节点
            // 添加节点需要维护前驱和后继,这是两步操作,无法保证原子性,这里先保证前驱节点的设置正确;
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 如果尾节点设置成功,再配置后继节点,这里在多线程操作的时候,这里如果被挂机,则CLH队列里面只能通过pre从后往前找才靠谱;
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    再次判断是否满足条件获取锁,如果不满足,则进入park阻塞,等待被unpark;
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	// 获取当前线程node的前驱节点
                final Node p = node.predecessor();
                // 如果前驱是head,则再次尝试获取锁,如果获取到,则不需要再进行park;
                if (p == head && tryAcquire(arg)) {
                	// 如果当前节点成功获取锁,则设置当前节点为head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断当前节点获取锁失败之后是否应该阻塞自己,只有当前驱节点waitStatus=-1时,才会返回true;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 这个状态用来标识线程是否被中断,如果被中断,则在acquireQueued返回true,调用处会执行Thread.currentThread().interrupt()
                    interrupted = true;
            }
        } finally {
        	// 如果出现一些异常,没有获取锁就结束了for循环,则将当前节点删除掉;
        	// 比如 线程被stop,会执行finally内的方法;
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱状态如果是-1,则返回true,添加进来的节点需要通过前驱的状态进行控制,前驱节点状态-1,表示后继节点在等待获取锁;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
        	// 如果前驱节点都已经被取消了,则向前找到一个没有被取消的节点,返回false,会重新走调用处的for循环;
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	// 如果不是-1 和1 则将其改为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    如果前驱节点状态是-1,则将当前节点阻塞;
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    前驱节点状态=-1表示后继节点在等待获取锁资源;

    这个方法单独拿出来说,是因为它包含了几种场景;
    由于不知道在acquireQueued内是什么导致的需要cancelAcquire,也可能是当前节点刚获取到锁,结果被stop,也可能是正在排队获取锁;

    private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        // 找到一个waitStatus不为1的前驱
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;
    
        // 如果当前节点是尾节点的情况下:
        // 将找到的前驱设置为尾节点;
        // 第一种情况:当前要取消的节点是尾节点,则尾节点前移;
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
    	    // 如果当前节点不是尾节点
            int ws;
        	// 前驱节点不是头节点,并且(前驱节点的状态为SIGNAL 或者 前驱状态为-1,0,将期改为-1) 并且 前驱线程不为空;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 第二种场景:当前取消节点的前驱不是头节点也不是尾节点;则将前驱节点的next指针指向node.next;
                // 这里也有个疑问点,为什么只改了node前驱的next指针,而没有改node.next的pre指针?
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
            	// 除此之外,就是需要唤醒后继节点;
                unparkSuccessor(node);
            }
    
            node.next = node; // help GC
        }
    }
    
    private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 当node.next不满足被唤醒条件时,则从后往前找满足唤醒条件的节点
        Node s = node.next;
        // s==null为什么也会进行从后往前找?
        // s==null,不能表示后继节点为空,因为在enq添加节点的时候,是先维护前驱节点,再维护后继节点,如果前驱维护成功,后继节点还未进行维护时,线程挂起,这里后继节点就是空,所以需要从后往前找,CLH队列里面前驱节点是靠谱的;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 找到waitStatus 为 0(默认) 或者  -1的节点;将其唤醒
            // 这里为什么是倒着找的呢?
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    上面提到了一个疑问为什么删除节点的时候,只维护node.pre指向node.next,而没有将node.next.pre=node.pre
    shouldParkAfterFailedAcquire中有这样一段代码:

    do {
        node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    这是当Node2被删除后的结构,当Node3拿到资源执行的时候,执行上面这段代码,之后:
    在这里插入图片描述
    这个时候,Node1和Node2的关系已经被得到维护;

    3.3.3 tryAcquireNanos

    尝试以独占方式获取,如果线程中断或时间超时会立即中止;

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取一次,获取不到则进入doAcquireNanos
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 计算出未来时间,超过这个时间未获取到,则响应失败
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // 这里的park传入了时间,阻塞nanosTimeout,超时则自动唤醒;
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    对acquire方法有了深入理解之后,tryAcquireNanos思想类似,就不细写了;

    3.3.4 acquireInterruptibly

    以独占方式获得,如果线程被中断,则立即中止抛出:InterruptedException异常;

    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
        // 如果线程被中断,则直接抛出异常;
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取锁
        if (!tryAcquire(arg))
        	// 获取不到,则阻塞加入队列
            doAcquireInterruptibly(arg);
    }
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 线程如果被调用interrupt会被唤醒,如果true,则表示线程被中断,这里直接抛出中断异常;
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    3.3.5 tryRelease

    tryRelease方法是交由用户自定义的,release会调用tryRelase做循环判断;

    3.3.6 release

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 当node.next不满足被唤醒条件时,则从后往前找满足唤醒条件的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 找到waitStatus 为 0(默认) 或者  -1的节点;将其唤醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.3.7 疑问

    为什么是通过前驱节点的状态来控制后继节点阻塞情况?唤醒线程时为什么是从后往前找?

    https://www.cnblogs.com/dennyzhangdd/p/7218510.html

    3.3.8 插入图示

    在这里插入图片描述

    3.3.9 释放图示

    在这里插入图片描述
    在这里插入图片描述

    3.4 AQS共享锁

    加锁:

    • acquireShared()
    • tryAcquireShared()
    • tryAcquireSharedNanos()
    • acquireSharedInterruptibly()

    释放锁:

    • releaseShared()
    • tryReleaseShared()

    3.4.1 tryAcquireShared

    tryAcquireShared如果返回值>=0 表示获取锁成功,如果返回值是0,表示当前节点获取锁成功,但后面的线程再获取会失败;

    3.4.1 acquireShared

    在独占锁中,tryAcquire只需要返回true和false来表示占用成功或失败,而对于tryAcquireShared返回 >=0 则表示成功,否则失败;

    public final void acquireShared(int arg) {
        // 尝试获取共享资源,如果>=0表示获取成功;
        if (tryAcquireShared(arg) < 0)
            // 否则,加入阻塞队列;
            doAcquireShared(arg);
    }
    
    private void doAcquireShared(int arg) {
        // 共享模式的节点类型为:SHARED
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果前驱节点是head,则尝试再获取;
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 如果获取成功,则设置头节点 并唤醒后续节点
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        // 因为是共享锁,获取到锁之后,可以理解为会传播到后继SHARED类型节点也获到锁
        if (propagate > 0 ||      // 表示有剩余资源,可以唤醒后续节点(如果有SHARED类型的)
            h == null || h.waitStatus < 0 || // 旧head为空(共享锁可能后继节点释放锁) 或waitStatus<0 (表示后继节点在等待获取锁)
            (h = head) == null || h.waitStatus < 0) // 新head为空 或waitStatus<0 (表示后继节点在等待获取锁)
         {
            Node s = node.next;
            // 如果下个节点的类型是Shared,则进行唤醒
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    setHeadAndPropagate 方法内部判断h和head是否为空null;为什么要做这些判断?因为共享锁可能会有很多的线程在同时释放锁doReleaseShared如果队列中有等待线程,则可能被很接近同时被唤醒,各自setHead,很有可能第一被唤醒的线程的h和head已经发生了变化;

    在共享锁中,可以被唤醒,如果竞争不到资源还是会被park的;

    3.4.2 releaseShared

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果状态是-1,表示后继节点等待,改为0表示当前节点已获取资源释放;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 从后往前找第一个满足唤醒的节点
                    unparkSuccessor(h);
                }
               // 多个线程同时在doreleaseshared时,t1将head.waitStatus=0,则t2进来判断ws==0,则将head改为-2,则t1在获取锁执行setHeadAndPropagate会再次做唤醒下个节点;
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果头节点没有发生变化,则退出循环,否则,继续做唤醒的动作;
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    本文内容只是对API的理解,对同步器框架的设计精髓还有得挖掘,,日后对AQS有较的理解之后,再来补原理;

  • 相关阅读:
    高并发场景QPS等专业指标揭秘大全与调优实战
    llava1.5-部署
    棱镜七彩加入UOS主动安全防护计划(UAPP),共建信创生态
    科技云报道:AI写小说、绘画、剪视频,生成式AI更火了!
    Edegex Foundry docker和源码安装
    【业务功能篇96】微服务-springcloud-springboot-认证服务-登录注册功能-Auth2.0-分布式session
    【转】大数据安全--敏感数据识别和分级打标
    北大联合智源提出训练框架LLaMA-Rider
    闲话Python编程-集合set
    【Python机器学习】零基础掌握LinearDiscriminantAnalysis判别分析
  • 原文地址:https://blog.csdn.net/shixiaoling123/article/details/125098317