• 【一知半解】AQS


    什么是AbstractQueuedSynchronizer(AQS)

    字面意思是抽象队列同步器,使用一个voliate修饰的int类型同步状态,通过一个FIFO队列完成资源获取的排队工作,把每个参与资源竞争的线程封装成一个Node节点来实现锁的分配。

    AbstractQueuedSynchronizer源码

    public abstract class AbstractQueuedSynchronizer 
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        private transient volatile Node head;//链表头
        private transient volatile Node tail;//链表尾
        private transient Thread exclusiveOwnerThread;//持有锁的线程
        private volatile int state;//同步状态,0表示当前没有线程获取到锁
        static final class Node {//链表的Node节点类
          volatile int waitStatus;//当前节点在队列中的状态
          volatile Node prev;//前置节点
          volatile Node next;//后置节点
          volatile Thread thread;//当前线程
        }
    }
    

    AQS同步队列的基本结构

    Node.waitStatus的说明

    状态值 描述
    0 节点默认的初始值
    SIGNAL=-1 线程已经准备好,等待释放资源
    CANCELLED=1 获取锁的请求线程被取消
    CONDITION=-2 节点在队列中,等待唤醒

    state为什么要用volatile修饰?

    1. 可见性,一个线程对变量的修改可以立即被别的线程感知到
    2. 有序性,禁止指令重排

    AQS获取锁步骤

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
    
    1. 当一个线程获取锁时,首先判断state状态值是否为0
    2. 如果state==0,则通过CAS的方式修改为非0状态
    3. 修改成功,则表明获取锁成功,执行业务代码
    4. 修改失败,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
    5. 如果state!=0,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程

    AQS获取锁过程

    首先调用tryAcquire去修state的状态值,成功就获取当前锁;失败则加入当前等待队列中,然后挂起线程。

    tryAcquire

    AQS的源码中tryAcquire是一空实现,需要它的子类去实现这个空方法。因为在AQS中虽然公平锁非公平锁的都是基于一个CLH去实现,但是在获取锁的过程中略有不同。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    公平锁FairSync#tryAcquire

    protected final boolean tryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();
                int c = getState();//获取同步器的状态
                if (c == 0) {//当前没有线程获取到锁
                    //首先判断祖宗节点的线程是否当前线程一样
                    if (!hasQueuedPredecessors() &&
                        //更改state的状态值为非0
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果锁持有者的线程是当前线程,则可放行,锁的重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    /**
    * 判断祖宗节点的线程是否当前线程一样
    * 傀儡节点的下个节点
    */
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        //头节点的下个节点所持有的线程是否与当前线程相同
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    折叠

    非公平锁NonfairSync#tryAcquire

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //通过CAS更改state的状态值
            if (compareAndSetState(0, acquires)) {
                //把当前线程设置为锁的持有者
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果锁持有者的线程是当前线程,则可放行,锁的重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    } 
    


    对比后发现,公平锁先判断是否有老祖宗节点,如果有则返回false;如果当前线程对应的node就是老祖宗节点,则直接去修改state状态,把state改为非0。

    addWaiter

    获取锁成功的线程去执行业务逻辑了,获取锁失败的线程则会在队列中排队等候,每个等候的线程也都不安分的。

    private Node addWaiter(Node mode) {
        //把当前线程封装为一个Node节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //加入到队列的尾部
        enq(node);
        return node;
    }
    
    1. 把当前线程封装为一个Node节点
    2. 当第一次执行这个方法时,由于head和tail都还没有赋值,则pred指向的tail也是空,所以直接直到enq(node)
    3. 当pred指向的tail不为空时,则通过CAS的方式加入到尾部,如果成功直接返回;如果失败,则进入enq(node)通过自旋的方式加入。
    //通过自旋的方式将节点加入到节点的尾部
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    为了操作链表的方便,一般都要在链表的头前加入一个傀儡节点,AQS的链表也不例外。
    先创建一个傀儡节点,并把head、tail均指向它,然后再把node节点加入到尾部后面,移动tail的指向。

    acquireQueued

    当节点成功加入到链表的尾部后,等待被唤醒,然后通过自旋的方式去获取锁

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //当前节点的前置节点
                final Node p = node.predecessor();
                //如果前置节点是傀儡节点(head指向傀儡节点),则再次尝试去获取锁
                if (p == head && tryAcquire(arg)) {
                    //获取成功后,则移除之前的傀儡节点,head指向当前node,
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取锁失败后,设置node节点的状态,并挂起当前节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. 获取node节点的前置节点,如果前置节点是head,则再次尝试去获取锁
    2. 设置当前node节点的前置节点状态为-1(表示后续节点正在等待状态,默认是0),然后通过自旋的后会进行到parkAndCheckInterrupt挂起当前节点
    3. LockSupport.park(this)执行完事,当前线程会一直阻塞到这个地方
    4. 当前唤醒时再次从1开始执行

    AQS释放锁过程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    主要是恢复state的值、重置锁持有都线程,然后唤醒挂起的线程。

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        //当前线程与锁持有者线程不一样会报错
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {//重入的次数为0时,则当前线程已经没有重入了,可以清空锁的持有者
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    恢复state状态的值,如果重入次数为0时,则清空锁的持有都为null

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)//唤醒下个node对应的线程
                LockSupport.unpark(s.thread);
        }
    

    设置head指向的node节点的watiStatus的状态值,然后找到下个节点对应的线程并唤醒。

  • 相关阅读:
    SpringBoot @PropertySource注解使用
    使用Java拓展本地开源大模型的网络搜索问答能力
    Unity WebGL ios 跳转URL
    Linux操作系统粘滞位(解决上篇文章提出的问题)
    总结篇:二叉树遍历
    【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割6(数据预处理)
    Debian11安装MySQL8.0,链接Navicat
    动态规划之数组压缩空间、星号技巧【3】
    多卫星定位算法
    【C语言从入门到放弃 6】递归,强制类型转换,可变参数和错误处理详解
  • 原文地址:https://www.cnblogs.com/hitechr/p/16474402.html