• 一文带你深入理解 AQS


    AQS

    队列同步器
    AbstractQueuedSynchronizer,用来构建锁或者其他同步组件的基础框架,内部使用一个变量state来表示同步状态,同时使用一个FIFO队列来完成线程的排队工作。
    锁或者其他同步组件一般都会定义一个静态内部类,该静态内部类会继承AQS,同时重写AQS中的方法,重写AQS中的方法时需要用到下面三个方法来获取同步状态。

    • getState()
      获取state属性的内容。
    • setState(int newState)
      设置state属性的内容。
    • compareAndSetState(int expect, int update)
      使用CAS设置当前状态,保证状态设置的原子性。

    总结:如何自定义一个锁或者同步组件?
    创建静态内部类继承AQS,重写AQS中的可重写的方法,在里面使用AQS提供的如上三个方法来获取、修改同步状态。最后调用AQS中的模板方法来进行操作,模板方法中会调用重写的方法。
    即使用者调用模板方法,模板方法调用重写方法,重写方法调用如上三个方法。

     

    可重写的方法

    protected boolean tryAcquire(int arg);
    

    独占式获取同步状态,查询当前状态并根据具体条件设置同步状态。

    protected boolean tryRelease(int arg);
    

    独占式释放同步状态,等待的线程有机会获取同步状态。

    protected int tryAcquireShared(int arg);
    

    共享式获取同步状态,返回大于等于0的值表示获取成功,反之获取失败。
    4.

    protected boolean tryReleaseShared(int arg);
    

    共享式释放同步状态。
    5.

    protected boolean isHeldExclusively();
    

    表示是否被当前线程占用。

    模板方法

    1. 独占式获取同步状态

    当前线程获取成功则会返回,否则进入同步队列等待,调用重写方法中的tryAcquire
    2. 独占式获取同步状态,响应中断

    void acquireInterruptible(int arg);
    

    如果当前线程被中断,则会抛出InterruptedException。

    1. 超时获取同步状态
    boolean tryAcquireNanos(int arg, long nanos);
    

    在acquireInterruptible的基础上设置超时时间,如果超时时间还没有获取到同步状态,会返回false,否则返回true。 4. 共享获取同步状态

    void acquireShared(int arg);
    
    1. 共享获取同步状态,响应中断
    void acquireSharedInterruptible(int arg);
    
    1. 共享获取同步状态,响应中断,添加超时时间
    boolean tryAcquireSharedNanos(int arg, long nanos);
    
    1. 独占式释放同步状态
    boolean release(int arg);
    

    同步队列中的第一个节点将会被唤醒。 8. 共享式释放同步状态

    boolean releaseShared(int arg);
    
    1. 获取等待在同步队列上的线程集合
    Collection getQueuedThreads();
    

    总之:模板可以分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列线程等待情况。获取又有分为中断、超时。

    自定义同步组件

    1. public class UnReetrantLock implements Lock {
    2. public static class Sync extends AbstractQueuedSynchronizer {
    3. @Override
    4. protected boolean tryAcquire(int arg) {
    5. if (compareAndSetState(0, 1)) {
    6. setExclusiveOwnerThread(Thread.currentThread());
    7. return true;
    8. }
    9. return false;
    10. }
    11. @Override
    12. protected boolean tryRelease(int arg) {
    13. setExclusiveOwnerThread(null);
    14. setState(0);
    15. return false;
    16. }
    17. @Override
    18. protected boolean isHeldExclusively() {
    19. return getState() == 1;
    20. }
    21. public Condition newCondition() {
    22. return new ConditionObject();
    23. }
    24. }
    25. private Sync sync = new Sync();
    26. @Override
    27. public void lock() {
    28. sync.acquire(1);
    29. }
    30. @Override
    31. public void lockInterruptibly() throws InterruptedException {
    32. sync.acquireInterruptibly(1);
    33. }
    34. @Override
    35. public boolean tryLock() {
    36. return sync.tryAcquire(1);
    37. }
    38. @Override
    39. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    40. return sync.tryAcquireNanos(1, unit.toNanos(time));
    41. }
    42. @Override
    43. public void unlock() {
    44. sync.release(1);
    45. }
    46. @Override
    47. public Condition newCondition() {
    48. return sync.newCondition();
    49. }
    50. }

    AQS实现

    底层数据结构:同步队列
    AQS中使用一个双向链表来保存等待同步状态的线程,链表的节点用其内部自定义的Node表示,Node类源码:

    1. static final class Node {
    2. static final Node SHARED = new Node();
    3. static final Node EXCLUSIVE = null;
    4. volatile int waitStatus;
    5. volatile Node prev;
    6. volatile Node next;
    7. volatile Thread thread;
    8. Node nextWaiter;
    9. }

    waitStatus有五个状态:

    • cancelled = 1:同步队列中的线程等待超时或者中断时的状态,后续不会再改变。
    • signal = -1:节点获取同步状态,一般是队头节点,后续节点处于等待状态。
    • condition = -2:节点在等待队列中(注意不是同步队列),线程等待Condition,当Condition调用了signal()之后,该节点会从等待队列转移到同步队列
    • propagate = -3:
    • initial = 0:初始状态。

    同步队列采用尾插法的方式,同时会使用CAS保证尾插的时候是线程安全的。其结构如下:

     

    其中队头是获取同步状态成功的节点,当首节点的线程释放同步状态的时候,会唤醒后继的节点,后继节点会成为首节点。(这个过程不用CAS,没有竞争的情况。)

    acquire方法流程

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

     

    同步队列中的节点不断地在自旋判断其前驱节点是不是头节点,如果是则尝试获取同步状态,否则会阻塞节点中的线程。

    acquireShared方法流程

    1. public final void acquireShared(int arg) {
    2. if (tryAcquireShared(arg) < 0)
    3. doAcquireShared(arg);
    4. }

     

    ReetantLock

    ReentrantLock,支持重入锁和公平与非公平锁。

    ReentrantLock实现可重入

    重入锁:支持线程反复地获取锁资源而不会自己阻塞自己,有两个问题要实现:

    1. 线程再次获取锁,判断是否是当前线程获取锁。
    2. 锁的最终释放,需要计数锁被重入几次,计数器最终释放为0时才表示锁的最终释放。

     

    例如非公平锁每次再尝试获取锁的时候都会判断是不是同个线程,如果是的话增加计数器的值。释放锁时等到计数器的值为0时才将占有锁的线程设置为null。

     

    公平锁与非公平锁

    公平锁:获取锁的线程按照绝对的时间顺序,FIFO。
    非公平锁:只要CAS设置同步状态成功,就获取锁,不会按照FIFO顺序。
    ReentrantLock的构造方法中传入true时可以创建公平锁:

    1. public ReentrantLock(boolean fair) {
    2. sync = fair ? new FairSync() : new NonfairSync();
    3. }

    公平锁在tryAcquire的时候会判断当前线程是否有前驱节点,有的话则会等待前驱节点释放之后在获取尝试获取锁。 公平锁的tryAcquire:

     

    hasQueuePredecessors方法用来判断是否有前驱节点

    非公平锁的tryAcquire:

     

    问:如何实现公平锁? 构造函数的参数传入true,在重写的tryAcquire方法中判断当前线程是否有前驱线程,有的话尝试获取同步状态失败,以此来达到公平的效果。

    对比: 公平锁虽然会按照FIFO原则,但是会进行大量的线程切换,非公平锁虽然可能会造成其他线程饥饿,但是可以极大提高吞吐量。

     

  • 相关阅读:
    量化接口代码能不能用?
    《ASP.NET Core 6框架揭秘》样章发布[200页/5章]
    回归商业初心,宝尊电商“深耕广拓”缔造品牌电商有质增长
    SpringMVC - 详解RESTful
    MySQL主从复制最全教程(CentOS7 yum)
    840. 矩阵中的幻方。python三连双等 a==b==c
    Java8-Stream流详细教程
    创意电子学-小知识:如何使用面包板
    【再识C进阶2(下)】详细介绍指针的进阶——利用冒泡排序算法模拟实现qsort函数,以及一下习题和指针笔试题
    学习C语言第十五天
  • 原文地址:https://blog.csdn.net/m0_74931226/article/details/127841365