面试思考:
重入的概念是什么,实现原理是什么?
公平锁和非公平锁区别是什么,体现在哪些地方?
没有竞争到锁的线程怎么实现等待,释放锁时又是怎么被唤醒的?
我们先用reentrantLock改造一下synchronized那个程序来达到相同效果体验一下lock:
- package com.example.demo.service;
- import lombok.SneakyThrows;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class ReenterLockDemo {
- static int flag = 0;
- public static void main(String[] args) {
- ReentrantLock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- new Thread("线程1") {
- @SneakyThrows
- public void run() {
- while(flag<10) {
- lock.lock();
- try {
- System.out.println(Thread.currentThread().getName()+"打印:"+(++flag));
- condition.signal();
- condition.await();
- }finally {
- lock.unlock();
- }
- }
- };
- }.start();
- new Thread("线程2") {
- @SneakyThrows
- public void run() {
- while(flag<10) {
- lock.lock();
- try {
- System.out.println(Thread.currentThread().getName()+"打印:"+(++flag));
- condition.signal();
- condition.await();
- }finally {
- lock.unlock();
- }
- }
- };
- }.start();
- }
- }
-
发现没有是不是跟synchronized代码结构差不多,只是多了一个lock和unlock的过程。我们先画个图来对比一下synchronized和lock的数据结构:
这就是学技术特别有趣的地方,你看多了感觉很多高大上的技术底层设计实际都差不多。下面我们具体看下reentrantLock加锁过程:
- publicReentrantLock(){
- sync =newNonfairSync();
- }
ReentrantLock默认是非公平锁。
- final voidlock(){
- //先通过cas把state从0设置成1
- if(compareAndSetState(0,1))
- //如果设置成功说明当前线程获取了锁,把锁拥有者设置成当前线程
- setExclusiveOwnerThread(Thread.currentThread());
- else
- 设置失败则走这个方法
- acquire(1);
- }
-
- public final voidacquire(int arg){
- //这里if使用&&执行了两个方法,当两个条件都true时会执行selfInterrupt();,也就是中断当前线程,这里可以猜测一下:这两个方法应该就是获取锁或者进入等待队列,当都失败时说明加锁过程就失败了,直接中断线程结束流程。
- if(!tryAcquire(arg)&&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
-
- final booleannonfairTryAcquire(int acquires){
- //获取当前线程
- final Thread current = Thread.currentThread();
- //获取state的值,state是被volatile修饰,对所有线程都可见
- int c =getState();
- //当state值为0时,说明此时持有锁的线程已经释放锁,此时就可以通过cas重新竞争锁
- if(c ==0){
- if(compareAndSetState(0, acquires)){
- setExclusiveOwnerThread(current);
- returntrue;
- }
- }
- //当state值不为0时,判断当前线程是否是锁持有线程,是则把state设置成原值+acquires,获取锁成功
- elseif(current ==getExclusiveOwnerThread()){
- int nextc = c + acquires;
- if(nextc <0)// overflow
- thrownewError("Maximum lock count exceeded");
- setState(nextc);
- returntrue;
- }
- //否则获取锁失败
- returnfalse;
- }
nonfairTryAcquire方法主要是想看看已经获得锁的线程是不是当前线程,是的话可以再次获取到锁,体现了可重入的特性。不是的话就需要执行acquireQueued方法
- private Node addWaiter(Node mode) {
- //把当前线程封装成node,nextWaiter=EXCLUSIVE= null
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- //判断当前AQS的tail是否是null,刚开始肯定是null,但当已经有一个node在AQS中时就不为null,不为null时把node设置为tail,pred的next指针指向node,node的prev指针指向pred
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- //tail为null时执行
- enq(node);
- return node;
- }
-
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- 当tail为null时使head=tail=new Node()
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- 不为null时把node设置为tail,pred的next指针指向node,node的prev指针指向pred,并跳出死循环
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
addWaiter方法实际就是初始化了AQS中的双向链表,head和tail开始指向的是一个没有线程的空node,之后把封装了线程的EXCLUSIVE node放入双向链表中,tail指向EXCLUSIVE node,head指向没有线程的空node,类似于下图:
当封装好node并进入AQS的双向链表之后进入acquireQueued方法:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- 获取当前线程的前一个节点
- final Node p = node.predecessor();
- //如果前一个是head则会再执行nonfairTryAcquire方法进行获取锁,如果获取成功
- if (p == head && tryAcquire(arg)) {
- 把当前线程的node设置成head,thead=null,prev=null
- setHead(node);
- 把之前的head从双向链表中断开引用,让gc回收
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- //如果前一个节点不是head或者获取锁失败则执行if中的方法
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
-
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- //获取当前线程前一个node的waitStatus值,waitStatus是volatile修饰的int,所以初始化值是0
- int ws = pred.waitStatus;
- 当waitStatus是-1时,说明当前node已经给pred node设置过waitStatus了,直接返回true
- if (ws == Node.SIGNAL)
- return true;
- 当waitStatus大于0时,实际上就是1,1代表CANCELLED,就是已经终止了的node,所以用while直接跳过所有终止的node,并返回false
- if (ws > 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- 如果即不等于-1也不是1,就把pred设置成-1,-1实际上就是代表当前node需要park,然后返回false
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
-
shouldParkAfterFailedAcquire方法总结一下就是把当前node的上一个node的waitStatus设置成-1,但waitStatus前值不能是大于0的,因为大于0代表node已经终止了。在设置成功后并不会执行parkAndCheckInterrupt方法,而是再次进入了一次循环尝试获取锁,再没有成功的话就执行parkAndCheckInterrupt进行park,然后for循环就卡住,整个流程如下:
可以看出java的lock实际就是借助LockSupport.park(this);方法来实现,没有获取到锁的线程在for循环中park住,等待持有锁的线程来unpark唤醒他们。那么接下来看看unlock方法是怎么唤醒park线程的。
- public final boolean release(int arg) {
- 先执行tryRelease方法,tryRelease方法返回true继续执行if逻辑
- if (tryRelease(arg)) {
- Node h = head;
- head不为null并且waitStatus不为0时会执行unparkSuccessor方法,我们知道waitStatus初始值是0,并且在shouldParkAfterFailedAcquire方法时把waitStatus设置为-1
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
-
- protected final boolean tryRelease(int releases) {
- 当前state减去releases值获取c
- int c = getState() - releases;
- 当前线程不是拥有锁的线程则报错
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- 如果为0则说明没有线程持有锁,则把拥有锁的线程设置成null
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- 把status设置成c
- setState(c);
- 返回是否没有持有锁的线程
- return free;
- }
-
tryRelease方法就是在处理state变量的值,我们每次加锁时都会增加state的值,所以在释放锁时就减小state的值,直到回到原始值0,说明已经没有线程持有锁了。当没有线程持有锁时就会尝试唤醒等待线程
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- 先拿到waitStatus,如果小于0,则把他设置成初始值0
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- 获取node的下一个node,如果s是null或则waitStatus大于0.则用for循环从tail往前找waitStatus小于等于0的node,直到找到最前面一个waitStatus小于等于0的node
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- 如果找到了则唤醒node中的线程,此时就会进入acquireQueued方法中的for循环再次尝试获取锁
- if (s != null)
- LockSupport.unpark(s.thread);
- }
-
现在这个非公平锁上锁和释放锁的过程都比较清晰了,我们来总结一下:lock时会先尝试把state从0设置成1,成功则获取锁成功,失败则会判断持有锁的线程是不是当前线程,是则获取锁成功,不是则会被封装成EXCLUSIVE的node节点放入AQS的双向链表等待队列中,然后再一次判断自己是不是第一个等待线程,是的话会再次尝试获取锁,如果还是失败则park,等待持有锁的线程释放锁之后unpark唤醒。
如果head不是空节点,在非公平锁时p == head && tryAcquire(arg)这个判断就有问题,因为即使当前节点的前一个节点是head,那也不能去获取锁,因为此时head是有线程的,head线程可能都没执行完或自己都是park状态,这时候获取锁一点意义没有。
如果head不是空节点,那么在unlock时就不能用head去唤醒下一个node,此时要有更复杂的逻辑去维护各个node 状态,而且在线程被唤醒后还要维护waitStatus值,这就加大了代码复杂性
在lock时非公平锁会直接尝试cas改变state来获取锁,而不是执行tryAcquire方法
在执行tryAcquire方法时非公平锁在state是0时会直接尝试cas改变state来获取锁,不用判断AQS中有没有线程在等待
在执行acquireQueued方法执行tryAcquire方法也会进行一次2中的过程