JUC系列整体栏目
内容 | 链接地址 |
---|---|
【一】深入理解JMM内存模型的底层实现原理 | https://zhenghuisheng.blog.csdn.net/article/details/132400429 |
【二】深入理解CAS底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132478786 |
【三】熟练掌握Atomic原子系列基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132543379 |
【四】精通Synchronized底层的实现原理 | https://blog.csdn.net/zhenghuishengq/article/details/132740980 |
【五】通过源码分析AQS和ReentrantLock的底层原理 | https://blog.csdn.net/zhenghuishengq/article/details/132857564 |
【六】深入理解Semaphore底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132908068 |
【七】深入理解CountDownLatch底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/133343440 |
【八】深入理解CyclicBarrier底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/133378623 |
在学习这个AQS之前,需要先了解synchronized设计思想和底层实现,因为AQS的底层是参考于synchronized,可以看本人的上一篇文章
在上一篇中了解了synchronized的底层原理,synchronized是jvm层面的一把隐式锁,在java层面很难的去控制这把锁,如某些场景需要手动加解锁等,并且在jdk6之前,synchronized还是一把重锁,并没有后来的锁升级过程的一大堆优化,其性能也比较慢,需要来回的从用户态到内核态之间切换,名副其实的一把重量级锁。
为了实现一把java层面的锁,并且提高原来的性能的情况下,在编程大师 Doug Lea 的努力下,通过java代码实现锁的AQS终于问世。AQS主要是参考了synchronized的底层实现,如AQS内部也使用了同步等待队列和条件等待队列等,也是通过cas的方式抢锁,但是在性能上,是超过这个synchronized的,并且支持手动的加锁和解锁,对java开发者相对而言更加友好。即使后面synchronized经过了一系列锁的升级和优化,其性能也不如AQS
AQS:AbstractQueuedSynchronizer ,顾名思义,可以叫做抽象队列同步器,是一个java语言层面的一个抽象类,java层面主要是通过这个AQS的这种方式实现管程的,主要是支持jdk5及以上版本
由于这个AQS是通过java语言实现的,所以直接来分析他的底层源码即可
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {...}
AQS是参考Synchronized的底层来实现的,因此可以直接根据Synchronized的底层思想来看这个AQS。首先Synchronized有两个重要的队列组成,一个是同步等待队列,一个是条件等待队列,那根据这两个队列,看看AQS底层是如何实现的
同步等待队列在synchronized是通过这个cxq队列实现的,其内部采用的是栈结构,先进后出,所以并不能保证锁的公平性。当线程一进来抢锁时,抢到锁的线程继续往下执行,没抢到锁的线程进入这个同步等待队列
在这个AbstractQueuedSynchronizer
抽象类中,该类继承了一个父类AbstractOwnableSynchronizer
,在这个类中,有一个重要的属性exclusiveOwnerThread ,表示当前的独占锁的线程,因为这个队列是一个同步的队列,因此需要通过这个参数来记录是哪个线程抢到了锁。
如在ReentrantLock中,会进行一个尝试获取锁的状态,就会判断当前线程是否拿到了这把锁
if (Thread.currentThread() != getExclusiveOwnerThread())
在这个AbstractQueuedSynchronizer 抽象类中,里面有一个静态内部类Node,里面有如下参数
static final class Node {
...
}
根据注释,很明显可以看出,AQS的同步等待队列的名称叫做CLH同步等待队列,其底层是通过一个双向链表的方式形成的,链表是通过各个Node结点组成,每个结点都有自己的生命周期,前驱指针和后继指针,以及一个结点所处的状态,如能否被唤醒状态等。和synchronized中的cxq队列不同的是,CLH采用的是先进先出的队列结构,cxq采用的是先进后出的栈结构
在该类中,也能看到内部的抢锁逻辑,是通过这个cas比较与交换来实现的,主要是判断当前结点的线程是否为同步监视器中 exclusiveOwnerThread 线程
CLH同步等待队列的大概图像如下,当然底层有着更加复杂的逻辑,下文会继续讲解。大概原理就是会有一个同步监视块(蓝色部分),里面会记录当前获取到锁的线程exclusive和一个state状态,state为0表示队列中的线程可以抢锁,队列是由各个node结点组成的双向链表,每个结点里面会有一个前驱指针和后继指针,以及一个waitStatus的状态,为-1时表示可唤醒状态。
在synchronized中,当拿到锁的线程在调用wait方法之后,就会进入这个waitSet的条件等待队列中,在这个AQS中,也实现了这个条件等待队列的功能。在这个AbstractQueuedSynchronizer类中,有一个内部类ConditionObject类,如下,很明显这个条件等待队列是一个单向链表组成,并且里面也是由各个node结点组成,此时结点的waitState的值为-2.
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
...
}
synchronized主要是通过这个wait,notify和notyfyAll的方式来实现阻塞和唤醒功能,而在这个AQS中,是通过这个await,signal,signalAll 这三个方法实现
public final void await() throws InterruptedException {}
public final void signal() {}
public final void signalAll() {}
await方法的实现主要如下,内部会调用这个LockSupport.park进行阻塞,并且通过调用这个 addConditionWaiter 方法获取结点,可以发现这个结点时存放在链表的尾部的。在调用这个await方法之后,线程会释放资源,线程被sifnalAll唤醒的时候,会调用LockSupport.unpark的方式唤醒
public final void await() throws InterruptedException {
...
Node node = addConditionWaiter();
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
}
CLH同步等待队列和条件等待队列的关系如下,当拿到锁的线程调用await方法之后,此时结点会加入到条件等待队列的队尾里面,当某个结点被唤醒或者被全部唤醒的时候,会将条件等待队列的结点按顺序加入到同步等待队列的队尾,这样就实现了同步等待队列和条件等待队列之间的转换。(橙色部分表示CLH同步等待队列,绿色部分表示条件等待队列)
上面了解了不管是条件等待队列还是同步等待队列,内部都是由node结点组成,接下来重点聊一下这个Node结点
public final class Node{
static final Node SHARED = new Node(); //共享结点
static final Node EXCLUSIVE = null; //独占结点
static final int CANCELLED = 1; //异常状态
static final int SIGNAL = -1; //可以被唤醒的状态
static final int CONDITION = -2; //条件等待状态
static final int PROPAGATE = -3; //传播
volatile int waitStatus; //当前结点的生命状态,对应上面的1,-1,-2,-3
volatile Node prev; //前驱指针
volatile Node next; //后继指针
volatile Thread thread; //当前线程
Node nextWaiter; //下一个waiter
}
根据上面Node中的各个变量可知,Node可以分为共享结点和独占结点,每个结点有一个waitStatus的状态表示,主要有异常状态、可被唤醒状态、条件等待状态和可传播状态,每个结点可以有当前结点的生命状态,同时有一个前驱指针和后继指针,并有一个记录线程id的指针。
在阻塞队列中Node结点的waitStatus的值为-2,在信号量中Node结点的waitStatus值为-3
上面介绍了AQS的底层,接下来就可以分析一下这个耳熟能详的ReentrantLock了,依旧先分析一下他的底层源。这个类实现了Lock接口,在该接口中规范了如何加锁,解锁,尝试获取锁等
public class ReentrantLock implements Lock, java.io.Serializable {}
在这个ReentrantLock内部类中,有一个 Sync 的抽象的静态内部类,并且继承了这个AbstractQueuedSynchronizer 抽象队列同步器类,也就是说Sync已经具有该类的所有特性了。
abstract static class Sync extends AbstractQueuedSynchronizer{}
在这个Sync中类中,有两个静态的子类,分别是FairSync和NonfairSync类,分别代表着公平锁和非公平锁
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
在ReentrantLock类的构造方法中,默认的是一个非公平锁,减少阻塞,其效率相对较高
public ReentrantLock(boolean fair) sync = fair ? new FairSync() : new NonfairSync();
public ReentrantLock() sync = new NonfairSync();
因为ReentrantLock默认采用的是非公平锁的,因此先研究这个 NonfairSync ,如下
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock()
接下来研究这个lock方法,其内部实现如下,会先结果一个cas的比较与交换操作,随后将同步器中的Exclusive的值设置为当前线程的值。同步状态器为0时可以抢锁,为1时不能抢锁,在释放锁时又会设置成0。
final void lock() {
//cas操作,比较与交换
if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());
else acquire(1);
}
上面方法中的state状态和exclusive独占线程对应的就是下图中蓝色部分的同步监视器。
如果cas抢锁失败,那么就会调用这个acquire()方法,并传入参数1
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试加锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入队列中
selfInterrupt(); //阻塞
}
尝试获取锁的代码如下,会判断同步状态器的state值以及判断是否为重入锁
//判断是否获取锁
final boolean nonfairTryAcquire(int acquires) {
//获取当前获取锁的线程
final Thread current = Thread.currentThread();
//获取当前同步器状态,被volatile修饰的整型值,默认为0
int c = getState();
//如果同步状态器的值为0,说明外面的线程可以进行加锁操作
if (c == 0) {
//compareAndSetState:原子操作,比较与交换,进行加锁的操作,将state变量将0变为1
//setExclusiveOwnerThread:设置获取锁的的线程拥有者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果状态同步器不为0,可能由自己持有,也可能由别的线程持有锁
//重复加锁,如定义一个全局锁,出现了这个可重入锁的问题
else if (current == getExclusiveOwnerThread()) {
//重入一次就+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//别的线程获有锁,直接返回
return false;
}
如果尝试加锁失败,则会进入下一步操作,就是将这个Node加入队列中,即进入这个addWaiter方法中,其底层实现主要如下,首先会判断当前的队列是否为空,如果不为空,则将当前结点的前驱指针指向队列中的尾结点,随后通过cas的方式将当前结点作为尾结点,反之出现并发问题。如果队列中尾结点不存在,那么需要一个初始化队列和一个入队的操作,调用的是enq方法
//线程入队操作
private Node addWaiter(Node mode) {
//获取当前线程的Node结点,此时的mode是一个独占的结点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
//将当前结点的前驱指针指向队列中的尾结点
node.prev = pred;
//将当前结点做为队列的尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//结点入队
enq(node);
return node;
}
enq方法的底层实现如下,如果尾结点为空,就会通过自旋+cas的方式进行一个队列初始化的工作,然后队列不为空,此时大量线程依旧在自旋,则执行一个结点插入到队列的尾结点的操作。通过这个for循环,可以保证当前线程是一定可以入队成功的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果尾结点为空
if (t == null) { // Must initialize
//给头结点定义一个新的结点,自旋+cas实现,实现队列的初始化
if (compareAndSetHead(new Node()))
//此时头结点和尾结点是同一个结点
tail = head;
} else {
//当前结点的前驱指针指向尾结点
node.prev = t;
//通过比较与交换
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在线程入队之后,就需要堆线程进行阻塞操作,主要是通过acquireQueued 方法实现,其实现的底层逻辑如下,其逻辑和入队的逻辑一样,也是用了for自旋的方式来保证该结点一定会拿到锁。在拿到锁之前,就需要在队列中阻塞以及将当前结点修改成可被唤醒的状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断机制
boolean interrupted = false;
for (;;) { //for循环,保证可以一直拿到锁
final Node p = node.predecessor(); //获取前驱结点,就是一直往前取,获取头结点
if (p == head && tryAcquire(arg)) { //判断是不是头结点,尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //如果获取锁失败,修改Node标志
parkAndCheckInterrupt()) //阻塞
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
可以再研究一下以下这段代码,在这个shouldParkAfterFailedAcquire 方法中,将这个Node结点的waitStatus的状态修改成-1,即可被唤醒的状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)return true; //可被唤醒状态
if (ws > 0) { //取消状态
do {
node.prev = pred = pred.prev; //将链表移除
} while (pred.waitStatus > 0);
pred.next = node;
}
else compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置成可被唤醒状态
return false;
}
在修改完状态之后,又会调用这个 parkAndCheckInterrupt 方法,接下来查看这个方法可以得知,在该方法中会调用 LockSupport.park()
这个方法实现阻塞,并且调用这个interupted实现线程中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在线程被阻塞之后,需要通过调用unlock方法对线程进行解锁操作,接下来看看底层是如何实现的。
public void unlock() {
sync.release(1);
}
接下来继续往下看这个release 方法,首先会进行一个tryRelease尝试释放锁的方法
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0) //判断当前链表的头结点是否存在
unparkSuccessor(h);
return true;
}
return false;
}
接下来查看这个tryRelease 的底层实现,如下由于此时已经有线程拿到了锁,因此此时同步监视器中的state状态为1(没拿到为0),因此这个c的值为0,此时会将将同步监视器中的Exclusive的线程设置为null,如下图看图理解好一点,这样的话就是一步释放锁的操作,此时state状态为0,其他线程也可以来抢锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 0 = 1-1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); //将exclusive的的线程清空
}
setState(c); //修改同步监视器状态,将状态0返回
return free;
}
接下来进入release方法中的这个unparkSuccessor 方法中,查看其底层逻辑如下:首先会将当前结点的状态从-1可被唤醒状态改成0默认状态,由于队列中的线程都被阻塞,因此需要唤醒线程来抢锁,所以将当前结点的下一个结点给唤醒,通过调用:LockSupport.unpark 方法实现线程的唤醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; //获取当前释放锁的node结点状态
if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //将状态从可被唤醒状态修改成默认状态
Node s = node.next; //获取下一个要加锁的结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) LockSupport.unpark(s.thread); //唤醒下一个结点
}
在3.1.4的acquireQueued方法中,有一个 setHead() 方法,就是会将当前当前结点作为头结点,当前结点的前驱指针指向null,这样就可以实现上一个结点出队的逻辑
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
fairSync表示的是公平锁,其底层逻辑实现和非公平锁的步骤几乎一模一样,就是在入队之前,非公平锁可以直接进行一次cas抢锁的机会,但是公平锁需要判断队列中是否有数据,有就需要排队的操作。因此在这不做过多的解释,理解了非公平锁自然就能理解公平锁的底层实现了。
ReentrantLock是Lock的一个实现类,在该类内部定义了一个继承AQS的Sync内部类,该类有两个子类,分别是实现非公平锁的NonfairSync和实现公平锁的FairSync,默认使用的是非公平锁。以非公平锁为例,如下:
调用lock方法进行加锁: 首先会进行一个cas的抢锁逻辑,如果抢锁失败,则会加入到CLH同步等待队列,加入队列之前,会判断该队列是否已经创建,如果没有创建则创建,该队列是由Node结点组成的双向链表,如果队列已经创建,则将该线程当做一个Node结点对象加入到队列的尾部,此时通过调用LockSupport.park的方法将Node结点阻塞,并修改状态为-1,即可被唤醒状态。
调用unlock方法进行解锁: 内部首先会先释放同步监视器资源,并修改state的状态为0,这样可以保证锁没有被抢占,其次是将当前结点的下一个结点通过调用LockSupport.unpark的方式唤醒,这样就会有线程去获取锁,最后就是将被释放锁的结点移除,下一个获取锁的结点作为头结点。
通过上面的源码可知,ReentrantLock具有一下特性
如有转载,请标明出处:https://zhenghuisheng.blog.csdn.net/article/details/132857564