如果觉得我的文章还不错的话就点个赞吧,另外可以微信搜索【佘凡架构师】阅读更多的好文章,获取我为大家准备的资料。
AQS(Abstract Queued Synchronizer)翻译过来就是抽象队列同步器,是juc并发包下locks中的一个抽象class。包括ReentrantLock,ReentrantReadWriteLock,Semaphore,CountDownLatch,CyclicBarrier等都用到了AQS。
它其实就是多个线程在同时执行时,通过CAS的方式去更新AQS的state值。CAS在硬件层面保证了同时只有一个线程可操作,一个线程加锁成功,其他的失败。加锁成功的线程会设置exclusiveOwnerThread = 当前线程,并state=1。重入次数的话会在state值上做累加。
加锁失败的线程则会放入一个锁池队列,这样当上一个线程释放锁时,下一个线程就能够直接上去把锁拿到。
AQS默认是非公平锁,当线程A锁释放state=0,exclusiveOwnerThread = null,然后唤醒线程B去获取锁,但是这个时候突然来了一个线程D,二话不说直接执行CAS把锁抢占了,更新state=1,加锁成功。所以线程B只能再次回到等待队列中。
AQS的公平锁指的是公平竞争,会提前判断等待队列中是否有线程正在等待,若有等待,直接进入等待队列的队尾。哪怕state=0,exclusiveOwnerThread = null也是如此。
我们简单创建一个重入锁来跟踪一下代码原理
public void method(){
ReentrantLock lock = new ReentrantLock();
lock.lock();
//执行逻辑代码
lock.unlock();
}
ReentrantLock构造器
public ReentrantLock() {
//默认是非公平锁
sync = new NonfairSync();
}
ReentrantLock#lock
public void lock() {
//调用非公平锁的lock方法
sync.lock();
}
final void lock() {
//执行CAS
if (compareAndSetState(0, 1))
//成功获取锁,设置exclusiveOwnerThread = 当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则再尝试一次
acquire(1);
}
AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
//尝试再次加锁,若不成功则把当前线程放入等待队列中
//最后阻塞该队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Reentrant.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再判断一次,state=0
if (c == 0) {
//进行加锁
if (compareAndSetState(0, acquires)) {
//加锁成功
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前线程是否重入
else if (current == getExclusiveOwnerThread()) {
//重入次数++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置重入次数
setState(nextc);
return true;
}
return false;
}
AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
//创建node元素
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//若最后一个node不为空
if (pred != null) {
//该node的上一个就是最后一个(当前node加在最后)
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化设置,
enq(node);
return node;
}
AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//for无限循环
for (;;) {
//获取最后一个节点(当前节点)的上一个
final Node p = node.predecessor();
//若上一个是头部节点,并且加锁成功了
if (p == head && tryAcquire(arg)) {
//就把当前节点设置头节点
setHead(node);
p.next = null; // help GC
failed = false;
//这样不阻塞,一会再试一次看看能不能加锁成功
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}