• 加锁和解锁-ReentrantLock详解-AQS-并发编程(Java)


    1 AQS

    此处给出AQS的一些先行知识点,为后续详细解析AQS做铺垫。在给出一些必要的基础之后,我们先从分析AQS一些具体典型锁实现,最后对AQS做归纳总结。

    1.1 概念

    AQS(AbstractQueuedSynchronizer)是多线程同步器,它是JUC(java.util.concurrent)包中多个组件的底层实现,比如像Lock、CountDownLatch、Semaphore等都是用到了AQS。简单理解就是:AQS定义了模板,具体实现由各个子类完成。

    1.2 两种锁机制

    AQS提供2种锁机制,独占锁和共享锁。独占锁,就是存在多个线程去竞争同一共享资源的时候,同一个时刻,只允许一个线程去访问该共享资源,也就是说只能有一个线程获取该共享资源的锁。比如Lock中的ReentrantLock重入锁,它的实现就是用到了AQS的一个排它锁的功能。

    共享锁也称为读锁,也就是同一时刻,允许多个线程获取锁的资源,比如CountDownLatch、Semaphore,都用到了AQS中的共享锁的功能。

    1.3 公平锁和非公平锁

    关于锁的的公平性和非公平行,AQS的处理方法是,在竞争锁资源的时候,公平锁要判断双向链表中是否有阻塞的线程,如果有则需要去排队等待。而非公平锁的处理方式是,不管双向链表中是否有阻塞的线程在排队等待,它都会去尝试去修改state变量去竞争锁,这个过程是非公平的

    1.3 锁竞争

    AQS对于锁竞争通过维护一个双向链表实现的队列来完成,其第一个节点为哨兵节点。AQS相关代码如下:

    private transient volatile Node head;
    
    private transient volatile Node tail;
    
    private volatile int state;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • head:队列头结点
    • tail:队列尾节点
    • state:为锁状态,默认值0;s尝试获取锁即通过cas方法把state由0置为1,成功表示获取了锁,失败执行其他操作。

    内部链表节点类部分代码如下下:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
    
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;
        
        volatile Node prev;
        
        volatile Node next;
    
        volatile Thread thread;
        // 省略其他相关代码
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • SHARED:共享锁模式
    • EXCLUSIVE:独占锁模式
    • waitStatus:节点(任务)状态,取值为0(默认),CANCELLED,SIGNAL,CONDITION,PROPAGATE
      • 0默认,新建
      • CANCELLED:任务取消
      • SIGNAL:有职责唤醒后继节点
      • CONDITION:要阻塞在条件变量队列中的节点
      • PROPAGATE:后面用到在介绍
    • prev:前驱
    • next:后继
    • thread:任务线程

    1.4 条件变量

    AQS支持多个条件变量,条件变量阻塞队列为单链表,内部条件变量类部分代码如下:

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        
        public final void signal() {//省略其他代码}
        public final void await() throws InterruptedException {//省略其他代码}
        
        // 省略其他代码
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 解析

      • firstWaiter:表头
      • lastWaiter:表尾
      • signal():唤醒
      • await():等待(阻塞)
    • 说明:阻塞在条件变量上的线程被唤醒后,它还是要去竞争锁的,即会加入锁竞争的队列。

    2 ReentrantLock

    2.1 简介

    ReentrantLockUML如下图2.1所示:在这里插入图片描述

    ReentrantLock为一种可重入,可打断的独占式锁。默认实现为非公平锁,默认构造方法如下:

    public ReentrantLock() {
            sync = new NonfairSync();
    }
    
    • 1
    • 2
    • 3

    关于可重入,可打断和公平锁的实现原理我们会在后面分析,下面优先讲解锁的主要功能加锁和解锁。

    2 加锁

    当前线程获取锁有2种结果,成功或者失败。如下所示ReentrantLock的加锁源代码:

    public void lock() {
        sync.lock();
    }
    final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.1 加锁成功

    通过cas的方式把state由0设置为1,成功,获取锁成功,把exclusiveOwnerThread独占线程标识设置为当前线程,执行相关操作。

    2.2 加锁失败

    竞争锁失败执行acquire(1)方法,源代码如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • tryAcquire():尝试获取锁
    • acquireQueued():加入竞争锁队列
    • addWaiter:加入队尾
    • selfInterrupt():自我打断
    2.2.1 tryAcquire()

    tryAcquire()默认执行非公平锁的tryAcquire()方法,源代码如下:

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
            	throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    执行流程如下:

    • 获取当前线程current和锁状态state
    • 判断state==0
      • 是表示锁未被占用,即已被释放
        • compareAndSetState(0, acquires)cas把state由0设置为1
          • 竞争锁成功
            • setExclusiveOwnerThread(current);设置锁持有线程为当前线程
            • 返回true
      • 否表示锁被占用,判断current == getExclusiveOwnerThread()当前线程释放是持有锁的线程
        • 是表示锁重入,即当前持有锁的线程再次获取锁
          • 锁状态(计数)+1赋值nextc
          • 如果nextc小于0,表示溢出,直接报错
          • setState(nextc);返回true
    • 返回false

    执行流程图如下图 2.2.1-1所示:在这里插入图片描述

    2.2.2 addWaiter()

    源代码如下:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    该方法目标是把当前线程节点插入队列尾部,先执行判断尾节点是否为空

    • 尾节点为空,执行enq()方法先创建哨兵节点,在插入队尾
    • 尾节点不为空,直接插入队尾
    2.2.3 acquireQueued()
    2.2.3.1 主方法

    加入竞争锁队列后会根据情况尝试获取锁或者阻塞,源代码如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    执行流程如下:

    • 设置failed失败标记,进入try{}finally{}快
    • 设置interrupted打断标记,进入for(;;)循环
    • 获取目标节点的前驱节点
    • 判断前驱节点是否是头结点
      • 是头结点尝试获取锁
        • 成功
          • 把当前节点设置为头结点即哨兵节点与线程解绑
          • 原先的头结点端口连接,等待被GC回收
          • 返回打断标记,这里是for循环唯一的出口
      • 不是头结点或者尝试获取锁失败,进入阻塞流程
      • shouldParkAfterFailedAcquire(),前驱节点等待状态置为Node.SIGNAL,失败继续循环
      • parkAndCheckInterrupt(),park()阻塞,直至被唤醒或者被打断
        • 被唤醒继续执行for循环
        • 被打断,打断标记设为true,继续执行for循环
    2.2.3.2 shouldParkAfterFailedAcquire()

    源代码如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    该方法的目标就是设置目标节点node的前驱节点pred的waitStatus为Node.SIGNAL,该状态意味着它有责任唤醒它的后记节点。

    执行流程如下:

    • 获取前驱节点pred的waitStatus等待状态
    • 如果ws == Node.SIGNAL(-1),直接返回true,执行parkAndCheckInterrupt()方法
    • 如果ws > 0 ,意味着该节点线程(任务)被取消,跳过该节点继续寻找waitStatus<=0的节点(有头结点兜底),并将该节点设置为目标节点的前驱节点
    • 否则cas尝试把pred的waitStatus由当前ws设置为Node.SIGNAL
    • 返回false,意味着调用该方法的acquireQueued()继续执行for循环
    2.2.3.3 parkAndCheckInterrupt()

    源代码如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4

    该方法就是阻塞,之所以说默认是不可打断模式,就是这里。被打断之后,只是返回true且清除了打断标记。返回上传调用方法acquireQueued继续执行for循环,直至获得锁,返回打断标记true。

    2.2.4 selfInterrupt()

    源代码如下:

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    • 1
    • 2
    • 3

    执行该方法的前提是线程在阻塞的时候被打断且获取到锁,然后执行自我打断。

    3 解锁

    释放锁源代码如下:

    public void unlock() {
        sync.release(1);
    }
    
    
    • 1
    • 2
    • 3
    • 4

    3.1 解锁成功

    3.1.1 tryRelease()方法

    源码如下:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    既然执行释放锁操作,说明当前线程已经获取锁。

    执行流程如下:

    • c:状态-1
    • 判断当前线程不等于为锁持有线程,抛异常,一般不会发生
    • 设置free标记
    • 如果c==0
      • free设置true,把当前所持有线程置为null
      • 如果c!=0,说明存在锁重入
    • setState©:state设置新值c
    • 返回free
    3.1.2 release()方法
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在tryRelease()方法返回true之后,判断如果竞争锁队列头结点不为空且状态!=0(-1,之前加锁流程把前驱节点状态设置),会唤醒后继节点,返回true;

    3.1.3 unparkSuccessor()

    源代码如下:

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    执行流程如下:

    • ws:获取目标节点node的waitStatus(node这里其实是头结点,waitStatus为-1)
    • 判断ws小于0
      • cas设置节点node等待状态由ws置为0
    • s:节点node的下一个节点
    • 如果s为空(没有竞争锁的线程)或者s.waitStatus > 0(任务被取消)
      • 从队尾开始循环获取一个t.waitStatus <= 0 的节点
    • s不为空唤醒节点s绑定的线程

    3.2 解锁失败

    解锁失败情况就是存在锁重入,未解锁到最后一层。

    4 后记

    如有问题,欢迎交流讨论。

    ❓QQ:806797785

    ⭐️源代码仓库地址:https://gitee.com/gaogzhen/concurrent

  • 相关阅读:
    liunx下定时备份mysql数据
    Quartus 使用 tcl 文件快速配置管脚
    如何借助边缘智能网关打造智慧城市便民驿站
    查询&会议签字
    『第七章』翩翩起舞的雨燕:顺序与并发执行
    Java 字节输出流FileOutputStream的用法和概述
    macOS Sonoma 14.6.1 (23G93) Boot ISO 原版可引导镜像下载
    如何把一行数据拆分成多条sql
    牛客在线编程101-91 反转字符串
    JavaScript中的类型转换
  • 原文地址:https://blog.csdn.net/gaogzhen/article/details/128034711