• reentrantLock


    面试思考:

    1. 重入的概念是什么,实现原理是什么?

    2. 公平锁和非公平锁区别是什么,体现在哪些地方?

    3. 没有竞争到锁的线程怎么实现等待,释放锁时又是怎么被唤醒的?

    我们先用reentrantLock改造一下synchronized那个程序来达到相同效果体验一下lock:

    1. package com.example.demo.service;
    2. import lombok.SneakyThrows;
    3. import java.util.concurrent.locks.Condition;
    4. import java.util.concurrent.locks.ReentrantLock;
    5. public class ReenterLockDemo {
    6. static int flag = 0;
    7. public static void main(String[] args) {
    8. ReentrantLock lock = new ReentrantLock();
    9. Condition condition = lock.newCondition();
    10. new Thread("线程1") {
    11. @SneakyThrows
    12. public void run() {
    13. while(flag<10) {
    14. lock.lock();
    15. try {
    16. System.out.println(Thread.currentThread().getName()+"打印:"+(++flag));
    17. condition.signal();
    18. condition.await();
    19. }finally {
    20. lock.unlock();
    21. }
    22. }
    23. };
    24. }.start();
    25. new Thread("线程2") {
    26. @SneakyThrows
    27. public void run() {
    28. while(flag<10) {
    29. lock.lock();
    30. try {
    31. System.out.println(Thread.currentThread().getName()+"打印:"+(++flag));
    32. condition.signal();
    33. condition.await();
    34. }finally {
    35. lock.unlock();
    36. }
    37. }
    38. };
    39. }.start();
    40. }
    41. }

    发现没有是不是跟synchronized代码结构差不多,只是多了一个lock和unlock的过程。我们先画个图来对比一下synchronized和lock的数据结构:

    这就是学技术特别有趣的地方,你看多了感觉很多高大上的技术底层设计实际都差不多。下面我们具体看下reentrantLock加锁过程:

    1. publicReentrantLock(){
    2. sync =newNonfairSync();
    3. }

    ReentrantLock默认是非公平锁。

    lock方法

    1. final voidlock(){
    2. //先通过cas把state从0设置成1
    3. if(compareAndSetState(0,1))
    4. //如果设置成功说明当前线程获取了锁,把锁拥有者设置成当前线程
    5. setExclusiveOwnerThread(Thread.currentThread());
    6. else
    7. 设置失败则走这个方法
    8. acquire(1);
    9. }

    acquire(1);方法

    1. public final voidacquire(int arg){
    2. //这里if使用&&执行了两个方法,当两个条件都true时会执行selfInterrupt();,也就是中断当前线程,这里可以猜测一下:这两个方法应该就是获取锁或者进入等待队列,当都失败时说明加锁过程就失败了,直接中断线程结束流程。
    3. if(!tryAcquire(arg)&&
    4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    5. selfInterrupt();
    6. }

    nonfairTryAcquire(arg)方法

    1. final booleannonfairTryAcquire(int acquires){
    2. //获取当前线程
    3. final Thread current = Thread.currentThread();
    4. //获取state的值,state是被volatile修饰,对所有线程都可见
    5. int c =getState();
    6. //当state值为0时,说明此时持有锁的线程已经释放锁,此时就可以通过cas重新竞争锁
    7. if(c ==0){
    8. if(compareAndSetState(0, acquires)){
    9. setExclusiveOwnerThread(current);
    10. returntrue;
    11. }
    12. }
    13. //当state值不为0时,判断当前线程是否是锁持有线程,是则把state设置成原值+acquires,获取锁成功
    14. elseif(current ==getExclusiveOwnerThread()){
    15. int nextc = c + acquires;
    16. if(nextc <0)// overflow
    17. thrownewError("Maximum lock count exceeded");
    18. setState(nextc);
    19. returntrue;
    20. }
    21. //否则获取锁失败
    22. returnfalse;
    23. }

    nonfairTryAcquire方法主要是想看看已经获得锁的线程是不是当前线程,是的话可以再次获取到锁,体现了可重入的特性。不是的话就需要执行acquireQueued方法

    acquireQueued方法

    addWaiter(Node mode)方法

    1. private Node addWaiter(Node mode) {
    2. //把当前线程封装成node,nextWaiter=EXCLUSIVE= null
    3. Node node = new Node(Thread.currentThread(), mode);
    4. // Try the fast path of enq; backup to full enq on failure
    5. Node pred = tail;
    6. //判断当前AQS的tail是否是null,刚开始肯定是null,但当已经有一个node在AQS中时就不为null,不为null时把node设置为tail,pred的next指针指向node,node的prev指针指向pred
    7. if (pred != null) {
    8. node.prev = pred;
    9. if (compareAndSetTail(pred, node)) {
    10. pred.next = node;
    11. return node;
    12. }
    13. }
    14. //tail为null时执行
    15. enq(node);
    16. return node;
    17. }

    enq(node);方法

    1. private Node enq(final Node node) {
    2. for (;;) {
    3. Node t = tail;
    4. 当tail为null时使head=tail=new Node()
    5. if (t == null) { // Must initialize
    6. if (compareAndSetHead(new Node()))
    7. tail = head;
    8. } else {
    9. 不为null时把node设置为tail,pred的next指针指向node,node的prev指针指向pred,并跳出死循环
    10. node.prev = t;
    11. if (compareAndSetTail(t, node)) {
    12. t.next = node;
    13. return t;
    14. }
    15. }
    16. }
    17. }

    addWaiter方法实际就是初始化了AQS中的双向链表,head和tail开始指向的是一个没有线程的空node,之后把封装了线程的EXCLUSIVE node放入双向链表中,tail指向EXCLUSIVE node,head指向没有线程的空node,类似于下图:

    当封装好node并进入AQS的双向链表之后进入acquireQueued方法:

    acquireQueued方法

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. 获取当前线程的前一个节点
    7. final Node p = node.predecessor();
    8. //如果前一个是head则会再执行nonfairTryAcquire方法进行获取锁,如果获取成功
    9. if (p == head && tryAcquire(arg)) {
    10. 把当前线程的node设置成head,thead=null,prev=null
    11. setHead(node);
    12. 把之前的head从双向链表中断开引用,让gc回收
    13. p.next = null; // help GC
    14. failed = false;
    15. return interrupted;
    16. }
    17. //如果前一个节点不是head或者获取锁失败则执行if中的方法
    18. if (shouldParkAfterFailedAcquire(p, node) &&
    19. parkAndCheckInterrupt())
    20. interrupted = true;
    21. }
    22. } finally {
    23. if (failed)
    24. cancelAcquire(node);
    25. }
    26. }

    shouldParkAfterFailedAcquire(p, node) 方法

    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. //获取当前线程前一个node的waitStatus值,waitStatus是volatile修饰的int,所以初始化值是0
    3. int ws = pred.waitStatus;
    4. 当waitStatus是-1时,说明当前node已经给pred node设置过waitStatus了,直接返回true
    5. if (ws == Node.SIGNAL)
    6. return true;
    7. 当waitStatus大于0时,实际上就是11代表CANCELLED,就是已经终止了的node,所以用while直接跳过所有终止的node,并返回false
    8. if (ws > 0) {
    9. do {
    10. node.prev = pred = pred.prev;
    11. } while (pred.waitStatus > 0);
    12. pred.next = node;
    13. } else {
    14. 如果即不等于-1也不是1,就把pred设置成-1,-1实际上就是代表当前node需要park,然后返回false
    15. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    16. }
    17. return false;
    18. }

    shouldParkAfterFailedAcquire方法总结一下就是把当前node的上一个node的waitStatus设置成-1,但waitStatus前值不能是大于0的,因为大于0代表node已经终止了。在设置成功后并不会执行parkAndCheckInterrupt方法,而是再次进入了一次循环尝试获取锁,再没有成功的话就执行parkAndCheckInterrupt进行park,然后for循环就卡住,整个流程如下:

    可以看出java的lock实际就是借助LockSupport.park(this);方法来实现,没有获取到锁的线程在for循环中park住,等待持有锁的线程来unpark唤醒他们。那么接下来看看unlock方法是怎么唤醒park线程的。

    release方法

    1. public final boolean release(int arg) {
    2. 先执行tryRelease方法,tryRelease方法返回true继续执行if逻辑
    3. if (tryRelease(arg)) {
    4. Node h = head;
    5. head不为null并且waitStatus不为0时会执行unparkSuccessor方法,我们知道waitStatus初始值是0,并且在shouldParkAfterFailedAcquire方法时把waitStatus设置为-1
    6. if (h != null && h.waitStatus != 0)
    7. unparkSuccessor(h);
    8. return true;
    9. }
    10. return false;
    11. }

    tryRelease(arg)方法

    1. protected final boolean tryRelease(int releases) {
    2. 当前state减去releases值获取c
    3. int c = getState() - releases;
    4. 当前线程不是拥有锁的线程则报错
    5. if (Thread.currentThread() != getExclusiveOwnerThread())
    6. throw new IllegalMonitorStateException();
    7. boolean free = false;
    8. 如果为0则说明没有线程持有锁,则把拥有锁的线程设置成null
    9. if (c == 0) {
    10. free = true;
    11. setExclusiveOwnerThread(null);
    12. }
    13. status设置成c
    14. setState(c);
    15. 返回是否没有持有锁的线程
    16. return free;
    17. }

    tryRelease方法就是在处理state变量的值,我们每次加锁时都会增加state的值,所以在释放锁时就减小state的值,直到回到原始值0,说明已经没有线程持有锁了。当没有线程持有锁时就会尝试唤醒等待线程

    unparkSuccessor(h);方法

    1. private void unparkSuccessor(Node node) {
    2. int ws = node.waitStatus;
    3. 先拿到waitStatus,如果小于0,则把他设置成初始值0
    4. if (ws < 0)
    5. compareAndSetWaitStatus(node, ws, 0);
    6. Node s = node.next;
    7. 获取node的下一个node,如果s是null或则waitStatus大于0.则用for循环从tail往前找waitStatus小于等于0的node,直到找到最前面一个waitStatus小于等于0的node
    8. if (s == null || s.waitStatus > 0) {
    9. s = null;
    10. for (Node t = tail; t != null && t != node; t = t.prev)
    11. if (t.waitStatus <= 0)
    12. s = t;
    13. }
    14. 如果找到了则唤醒node中的线程,此时就会进入acquireQueued方法中的for循环再次尝试获取锁
    15. if (s != null)
    16. LockSupport.unpark(s.thread);
    17. }

    现在这个非公平锁上锁和释放锁的过程都比较清晰了,我们来总结一下:lock时会先尝试把state从0设置成1,成功则获取锁成功,失败则会判断持有锁的线程是不是当前线程,是则获取锁成功,不是则会被封装成EXCLUSIVE的node节点放入AQS的双向链表等待队列中,然后再一次判断自己是不是第一个等待线程,是的话会再次尝试获取锁,如果还是失败则park,等待持有锁的线程释放锁之后unpark唤醒。

    为什么head节点是一个空节点呢

    1. 如果head不是空节点,在非公平锁时p == head && tryAcquire(arg)这个判断就有问题,因为即使当前节点的前一个节点是head,那也不能去获取锁,因为此时head是有线程的,head线程可能都没执行完或自己都是park状态,这时候获取锁一点意义没有。

    2. 如果head不是空节点,那么在unlock时就不能用head去唤醒下一个node,此时要有更复杂的逻辑去维护各个node 状态,而且在线程被唤醒后还要维护waitStatus值,这就加大了代码复杂性

    非公平锁相对于公平锁来说非公平体现在哪里

    1. 在lock时非公平锁会直接尝试cas改变state来获取锁,而不是执行tryAcquire方法

    2. 在执行tryAcquire方法时非公平锁在state是0时会直接尝试cas改变state来获取锁,不用判断AQS中有没有线程在等待

    3. 在执行acquireQueued方法执行tryAcquire方法也会进行一次2中的过程

    公众号同名,欢迎关注

  • 相关阅读:
    RabbitMQ-全面详解(学习总结---从入门到深化)
    Failed to launch task: 文件”Setup”不存在 Mac安装Adobe软件报错解决方案
    前端 vue 项目屏蔽右键
    pyecharts画图结果存为图片
    计算机组成原理 | 第一章
    Git基本应用<二>:Git的分支管理
    go | 切片的长度和容量
    Qt树形表格控件QTreeWidget的使用:添加自定义列表项
    MacOS下idea中Maven配置以及Maven替换国内阿里云镜像源
    基于python+django的个性化电影推荐系统设计与实现
  • 原文地址:https://blog.csdn.net/shidebin/article/details/126819695