• AQS(AbstractQueuedSynchronizer)框架之ReentrantLock


    park 与 unPark 使用

    ReentrantLock的实现使用的就是 park + 自旋的方式 ,下面举个例子来了解下 park 和 unpark 方法

    1. public static void main(String[] args) throws InterruptedException {
    2. log.debug("1");
    3. Thread t1 = new Thread(()->{
    4. log.debug("2");
    5. LockSupport.park();//让当前线程阻塞
    6. log.debug("4");
    7. });
    8. //告诉cpu t1 当前可调度;具体什么时候调度是由操作系统决定的
    9. t1.start();
    10. //让主线程睡眠2
    11. TimeUnit.SECONDS.sleep(2);
    12. log.debug("3");
    13. LockSupport.unpark(t1);//唤醒t1线程
    14. }

    打印结果

    手写原始方式实现锁

    锁,其实就是一个标识,当这个标识改变成了某个状态我们理解为获得锁

    1. public class CustomerLock {
    2. // status = 1; 不是原子性的,这是一条java代码 编译后会分为三条指令 gets=0 set1cache=1 set2
    3. // 执行set1时 只是在当前线程操作 将status改为1 此时只是在内存中操作,还未同步到主存当中。
    4. // 就是说线程t1修改status值还未同步写入主存时 t2也执行了相同操作,此时t2拿到的status还是0
    5. volatile int status = 0;
    6. //实例化这个类 ①为了调用cas方法 ②获取status变量的偏移量
    7. private static Unsafe unsafe = null;
    8. //CustomerLock当中status变量的内存偏移量
    9. private static long statusOffset;
    10. //获取 Unsafe对象
    11. static {
    12. Field singleOneInstanceField = null;
    13. try{
    14. singleOneInstanceField = Unsafe.class.getDeclaredField("theUnsafe");
    15. singleOneInstanceField.setAccessible(true);
    16. unsafe = (Unsafe) singleOneInstanceField.get(null);
    17. statusOffset = unsafe.objectFieldOffset(
    18. com.zld.cloud.CustomerLock.class.getDeclaredField("status"));
    19. }catch (Exception e){
    20. e.printStackTrace();
    21. }
    22. }
    23. void lock() throws InterruptedException{
    24. while (!compareAndSet(0,1)){
    25. TimeUnit.SECONDS.sleep(5);
    26. }
    27. }
    28. void unLock(){
    29. status = 0;
    30. }
    31. private boolean compareAndSet(int oldVal, int newVal) {
    32. return unsafe.compareAndSwapInt(this,statusOffset,0,1);
    33. }
    34. }
    1. public static void main(String[] args) throws InterruptedException {
    2. CustomerLock customerLock = new CustomerLock();
    3. Thread t1 = new Thread(()->{
    4. try {
    5. customerLock.lock();
    6. } catch (InterruptedException e) {
    7. throw new RuntimeException(e);
    8. }
    9. log.debug("1");
    10. log.debug("1");
    11. log.debug("1");
    12. log.debug("1");
    13. log.debug("1");
    14. customerLock.unLock();
    15. },"t1");
    16. Thread t2 = new Thread(()->{
    17. try {
    18. customerLock.lock();
    19. } catch (InterruptedException e) {
    20. throw new RuntimeException(e);
    21. }
    22. log.debug("2");
    23. log.debug("2");
    24. log.debug("2");
    25. log.debug("2");
    26. log.debug("2");
    27. customerLock.unLock();
    28. },"t2");
    29. t1.start();
    30. t2.start();
    31. }

    测试结果截图

    ReentrantLock 源码分析

    1. public class ReentrantLock implements Lock, java.io.Serializable {
    2. private static final long serialVersionUID = 7373984872572414699L;
    3. /** Synchronizer providing all implementation mechanics */
    4. private final Sync sync;
    5. ...
    6. }

    ReentrantLock 中有个 抽象类Sync,这个类有两个实现 FairSync 公平锁 和 NonfairSync 非公平锁

    1. abstract static class Sync extends AbstractQueuedSynchronizer {
    2. private static final long serialVersionUID = -5179523762034025860L;
    3. /**
    4. * Performs {@link Lock#lock}. The main reason for subclassing
    5. * is to allow fast path for nonfair version.
    6. */
    7. abstract void lock();
    8. /**
    9. * Performs non-fair tryLock. tryAcquire is implemented in
    10. * subclasses, but both need nonfair try for trylock method.
    11. */
    12. final boolean nonfairTryAcquire(int acquires) {
    13. final Thread current = Thread.currentThread();
    14. int c = getState();
    15. if (c == 0) {
    16. if (compareAndSetState(0, acquires)) {
    17. setExclusiveOwnerThread(current);
    18. return true;
    19. }
    20. }
    21. else if (current == getExclusiveOwnerThread()) {
    22. int nextc = c + acquires;
    23. if (nextc < 0) // overflow
    24. throw new Error("Maximum lock count exceeded");
    25. setState(nextc);
    26. return true;
    27. }
    28. return false;
    29. }
    30. protected final boolean tryRelease(int releases) {
    31. int c = getState() - releases;
    32. if (Thread.currentThread() != getExclusiveOwnerThread())
    33. throw new IllegalMonitorStateException();
    34. boolean free = false;
    35. if (c == 0) {
    36. free = true;
    37. setExclusiveOwnerThread(null);
    38. }
    39. setState(c);
    40. return free;
    41. }
    42. protected final boolean isHeldExclusively() {
    43. // While we must in general read state before owner,
    44. // we don't need to do so to check if current thread is owner
    45. return getExclusiveOwnerThread() == Thread.currentThread();
    46. }
    47. final ConditionObject newCondition() {
    48. return new ConditionObject();
    49. }
    50. // Methods relayed from outer class
    51. final Thread getOwner() {
    52. return getState() == 0 ? null : getExclusiveOwnerThread();
    53. }
    54. final int getHoldCount() {
    55. return isHeldExclusively() ? getState() : 0;
    56. }
    57. final boolean isLocked() {
    58. return getState() != 0;
    59. }
    60. /**
    61. * Reconstitutes the instance from a stream (that is, deserializes it).
    62. */
    63. private void readObject(java.io.ObjectInputStream s)
    64. throws java.io.IOException, ClassNotFoundException {
    65. s.defaultReadObject();
    66. setState(0); // reset to unlocked state
    67. }
    68. }

    ReentrantLock 是可重入锁,当state > 0 说明锁被持有

    公平锁的实现

    1. static final class FairSync extends Sync {
    2. private static final long serialVersionUID = -3000897897090466540L;
    3. final void lock() {
    4. acquire(1); //标识加锁成功后需要改变的状态
    5. }
    6. }

    aquire方法

    1. public final void acquire(int arg) {
    2. //尝试加锁 如果加锁失败则 调用 acquireQueued 方法加入队列排队
    3. //加入队列后会立刻park,直到解锁调用unpark醒来判断自己是否被打断
    4. if (!tryAcquire(arg) &&
    5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    6. selfInterrupt();
    7. }

    tryAcquire 方法

    1. protected final boolean tryAcquire(int acquires) {
    2. final Thread current = Thread.currentThread();//拿到当前线程
    3. int c = getState();//获取lock对象的锁状态 0为自由状态 1为上锁 大于1表示重入
    4. if (c == 0) { // 如果锁是自由状态
    5. //判断当前是否需要排队,如果不需要则进行cas操作尝试加锁
    6. if (!hasQueuedPredecessors() &&
    7. compareAndSetState(0, acquires)) {
    8. setExclusiveOwnerThread(current);//如果加锁成功,将当前线程设置为拥有锁的线程
    9. return true;
    10. }
    11. }
    12. //锁不是自由状态 但当前线程是持锁线程 表示重入则 状态值 +1
    13. else if (current == getExclusiveOwnerThread()) {
    14. int nextc = c + acquires;
    15. if (nextc < 0)
    16. throw new Error("Maximum lock count exceeded");
    17. setState(nextc);
    18. return true;
    19. }
    20. return false;
    21. }

    判断是否需要排队

    1. public final boolean hasQueuedPredecessors() {
    2. Node t = tail;
    3. Node h = head;
    4. Node s;
    5. return h != t &&
    6. ((s = h.next) == null || s.thread != Thread.currentThread());
    7. }

    这里会有不需要排队的两种情况

    • 队列没有初始化不需要排队,直接去加锁但可能会失败
      • 假设 t1 t2同时lock,t1 t2均查询到此时队列没有初始化认为不需要排队,同时进行cas修改操作,原子性操作必定有一个会失败,失败的将去排队
    • 队列已初始化,队列中的node大于1,假设tc加锁时发现队列中第一个就是自身,例如重入
      • 队列中第一个排队的线程进入队列后会尝试获取锁,如果此时持锁线程释放了锁则直接获取锁执行,如果没有释放那就排队 这里的判断由两部分组成
    • 队首不等于队尾三种情况
      • 队列没有初始化 直接上锁
      • 队列已经初始化了 假设h!=t成立(队首不等于队尾的情况一定是队列node个数大于1的情况,队列中只有一个node时不算排队 head节点不参与排队,它要么时持锁node,要么时虚拟头节点),队列中线程大于1个 s==null不成立 那么判断 s.thread != Thread.currentThread() 分两种情况,此时head持锁线程正在做事情,不知道有没有做完
        • 当前参与竞争锁的线程不是第一个排队线程,返回true需要排队
        • 当前参与竞争锁的线程就是第一个排队的线程,返回false不需排队,尝试加锁
          • 尝试加锁时,head节点 也就是持锁线程释放锁 则 加锁成功 返回最顶层aquire成功
          • 尝试加锁失败,head节点还没有释放锁
    • 队列已初始化但是里面只有一个数据
      • 队列初始化时会虚拟一个头节点h,AQS默认 h 是不参与排队的,当队列中的第一个排队node得到锁会将自己设置为h 头节点,将自身线程设置为持锁线程。因此队列已初始化但只有一个节点并且这个节点就是持锁线程 那么h!=t 就不成立直接返回false不需要排队。

    acquireQueued(addWaiter(Node.exclusive),arg))方法解析,两种情况

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }
    1. - first节点持有了锁(将自己改成head)未释放,curr节点(此时是first)尝试加锁失败仍需要排队
    2. - 其他线程抢占了锁,队列中有节点排队,curr(不是first,它前面还有其他node)跟着排队

    入队

    1. private Node addWaiter(Node mode) {
    2. //将当前线程封装成node对象
    3. Node node = new Node(Thread.currentThread(), mode);
    4. //将尾节点赋值给pred
    5. Node pred = tail;
    6. if (pred != null) { //如果队尾不为null 说明队列已初始化
    7. node.prev = pred;//设置当前节点的上个节点是原队列的尾节点
    8. if (compareAndSetTail(pred, node)) {//cas操作防止多线程加锁,确保当前节点入队时的原子操作
    9. pred.next = node;//将当前线程node设置为原队列尾节点的下一个节点
    10. return node;//返回当前线程节点对象
    11. }
    12. }
    13. enq(node);//队列还没初始化
    14. return node;//返回当前线程节点对象
    15. }

    初始化队列

    1. private Node enq(final Node node) {
    2. //死循环
    3. for (;;) {
    4. Node t = tail;//每次循环队尾节点赋值给t
    5. if (t == null) { //第一次循环时队尾肯定为null
    6. //调用无参构造方法实例化node对象 属性都为null
    7. if (compareAndSetHead(new Node()))
    8. //此时队列中只有一个新的虚拟node 使队列(链表)成立设置头尾相等
    9. tail = head;
    10. } else {//第二次循环t一定不为null
    11. node.prev = t; // 将当前线程节点对象 放到尾节点(第二次循环尾节点就是头节点)之后
    12. if (compareAndSetTail(t, node)) {//将当前线程对象node入队 并设置为队尾
    13. t.next = node;//维护好链表设置头节 原来队尾下一个节点尾当前线程节点
    14. return t;//返回队尾节点,即终止循环
    15. }
    16. }
    17. }
    18. }

    acquireQueued方法的源码分析

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {//死循环,走进这个方法node已经完成了入队
    6. //获得当前线程node的上个node有两种情况,1、p为head节点。2、p不为head
    7. final Node p = node.predecessor();
    8. //如果p是head,那么当前线程对象node 为first, 尝试加锁,其实就是想看下head节点释放锁了没
    9. if (p == head && tryAcquire(arg)) {
    10. setHead(node);//原head释放了锁,加锁成功将自身设置为head
    11. p.next = null; //原head的下一节点置空 因为当前node成为head
    12. failed = false;//标识 当前node加锁成功时为false
    13. return interrupted;//返回默认值
    14. }
    15. //这里分两种情况 1、上个节点不是head 2、上个节点是head但未释放锁 修改上个节点状态park
    16. if (shouldParkAfterFailedAcquire(p, node) &&
    17. parkAndCheckInterrupt())//自身park
    18. interrupted = true;
    19. }
    20. } finally {
    21. if (failed)
    22. cancelAcquire(node);
    23. }
    24. }

    Synchronized 1.6 之前对任何情况的处理相差不多,效率比较低下 ---重量级锁 并发编程之父Doug Lea 写了ReentrantLock

    ReentrantLock的四种情况

    ReentrantLock的性能比 synchronized 高

    • 从头到尾第一个线程t0获取锁的时候代价基本为0
      ReentrantLock加锁最大的开销在于 compareAndSetState(0,acquires) cas操作指令,cpu底层提供的硬件指令,跟语言无关
      • synchronize 只有一个线程时是偏向锁
    • 多线程之间顺序执行(t0执行完,t1执行)没有竞争执行,代价几乎为0
      • synchronize 此时升级为轻量级锁
    • 如果有线程竞争执行
      • 直到第二个线程自旋完 当前线程还没有释放锁
      • 第二个线程来加锁时没释放锁,但是入队自旋之后第一个线程释放了锁
    • t0持有锁不释放,t1尝试加锁失败,t2来加锁

    公平锁与非公平锁区别

    当锁是自由状态是 公平锁需要判断自己是否要排队,而非公平锁直接cas操作,当锁不是自由状态非公平锁和公平锁没有区别,非公平锁的效率比公平锁高
    sychronized 只支持公平锁
    有的人认为公平锁就是排队非公平锁是插队,这种说法是不完全正确的。 实际上非公平锁入队之后是无法插队的 只能在入队之前的两次抢锁过程种可以算插队。

  • 相关阅读:
    python-pandapower电力系统潮流计算无法收敛情况解决方法
    【Java进阶】泛型和多线程
    以用地业务链串接为核心的自然资源业务数据治理与重构路径
    配置适合树莓派的linux内核(对内核的配置,编译,并将编译好的内核挂载到树莓派上)(面试)
    设计定时任务实现数据同步的最佳实践
    石子合并 区间dp
    77-Java的Set系列集合、Collection体系的总结
    C++中类和动态内存分配
    TensorRT的结构
    Vue3 + TS 防抖动
  • 原文地址:https://blog.csdn.net/Java_ttcd/article/details/126403915