• Java之juc旅途-locks(六)


    synchronized关键字虽然可以解决大部分多线程锁的问题,但是仍旧存在下述问题:

    • 假如持有锁的某线程因等待长时IO或者其他原因阻塞,其他等待的线程无法响应中断,只能不断等待;
    • 多线程下只有读操作是不会发生冲突的,但synchronized关键字对读和写操作均一视同仁,所以当一个线程进行读取操作时,其他线程只能不断等待;
    • 使用synchronized关键字无法确认线程是否成功获取到锁。

    针对上述问题,Doug Lea李大爷实现了一套更加灵活的Java锁机制,即java.concurrent.locks包。其中AQS在前面的章节已经解析过了,就不再这里重复。

    Lock接口有下述6个方法,主要分为三大类:

    • 获取锁的方法,分别为lock()、lockInterruptibly()、tryLock()、tryLock(long, TimeUnit);
    • 释放锁的方法,unlock();
    • 线程协作相关的方法,newCondition()。

    synchronsized关键字不需要用户手动释放锁,当synchronized修饰的方法或代码块执行完毕后,系统会自动让线程释放对锁的占用。
    与synchronsized关键字不同的是,Lock必须由用户手动执行加锁/释放锁操作,当持有锁的线程发生异常时,该线程不会自动释放锁,可能会导致死锁,故Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。

    // 初始化锁对象
    Lock lock = ...;
    // 加锁操作
    lock.lock();
    try{
    	// 执行相应任务
    	doSomething();
    }catch(Exception e){
    	// 处理异常
    }finally{
    	// 释放锁
    	lock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Lock接口的实现类主要有5个,但只有一个是非内部类,也就是常用的ReentrantLcok

    除了Lock接口,还有做了读写锁的ReadWriteLock接口,ReadWriteLock 中定义了获取两种锁的方式,一个用于获取读锁、一个用于获取写锁。只要没有持有写锁的线程在执行,读锁可以同时被多个尝试读操作的线程持有,而写锁是排他锁。相应的实现类为ReentrantReadWriteLock

    以及还有一个StampedLock,是比ReentrantReadWriteLock更快的一种锁,支持乐观读、悲观读锁和写锁。和ReentrantReadWriteLock不同的是,StampedLock支持多个线程申请乐观读的同时,还允许一个线程申请写锁。
    一般来说,这种乐观锁的性能要比其他锁快几倍,而且随着线程数量的不断增加,性能的差距会越来越大。
    简而言之,在大量并发的场景中StampedLock的性能是碾压重入锁和读写锁的。但毕竟,世界上没有十全十美的东西,StampedLock也并非全能,它的缺点如下:

    1. 编码比较麻烦,如果使用乐观读,那么冲突的场景要应用自己处理
    2. 它是不可重入的,如果一不小心在同一个线程中调用了两次,那么你的世界就清净了。。。。。
    3. 它不支持wait/notify机制。

    所以为了不给自己填坑,一般代码里也不会用到StampedLock。。

    ReentrantLcok

    首先看它的特性:

    1. 可重入
    2. 公平/非公平
    3. 可中断
    4. 设置获取锁超时
    5. 支持条件等待

    ReentrantLock是基于AQS去实现一系列的功能的,但是他不是直接去继承实现,而是让成员变量Sync去完成这些操作。

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        /** Synchronizer providing all implementation mechanics */
     private final Sync sync;
     abstract static class Sync extends AbstractQueuedSynchronizer {
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可重入

    它怎么实现可重入锁的呢?其实很简单,因为AQS继承了AbstractOwnableSynchronizer,这个类可以维护一个当前的线程作为主线程,那么ReentrantLock在获取锁时会存储当前线程为拥有线程

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        // 存储当前线程为拥有线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果当前线程是拥有线程,则state叠加
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    释放锁时判断下当前操作的线程是否与主线程相等就可以完成AQS的状态的更改,也就实现了可重入

     protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    公平与非公平

    看构造函数

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    • 1
    • 2
    • 3

    看看非公平锁的流程

    final void lock() {
        // 直接尝试加锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 如果获取锁失败进入 AQS acquire 逻辑
            acquire(1);
    }
    
    public final void acquire(int arg) {
        // tryAcquire(arg) 尝试获取锁
        // acquireQueued 获取锁失败进行等待队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑(看方法头注释):

    // 非公平锁的逻辑
    // 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行
    // 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 无锁状态, 尝试竞争
        if (c == 0) {
            if (compareAndSetState(0, acquires)) { //是否获取到锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 当前线程持有锁, state 计数 +1
        else if (current == getExclusiveOwnerThread()) { //判断是否是重入
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 会去判断是否有队列,没有队列直接cas获取锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象

    公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量

    可中断

    如果是使用lockInterruptibly进行获取锁,则可以去判断当前线程是否已经中断而决定抛出异常。

       public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    设置获取锁超时

    底层还是由AQS去实现锁超时。

        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    条件等待

    在Sync里有这么一个对象,在CycleBarrier就是拿这个实现的。

            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
    
    • 1
    • 2
    • 3
    • 4

    Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。

    Condition 是个接口,基本的方法就是 await() 和 signal() 方法:

    • Conditon中的await()对应Object的wait();
    • Condition中的signal()对应Object的notify();
    • Condition中的signalAll()对应Object的notifyAll()。

    ReentrantReadWriteLock

    他有一系列的成员变量

    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
        private final ReentrantReadWriteLock.ReadLock readerLock;
        private final ReentrantReadWriteLock.WriteLock writerLock;
        final Sync sync;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    核心还是由Sync来实现,看他的成员类

    static final class HoldCounter {
        int count;          // initially 0
        // Use id, not reference, to avoid garbage retention
        final long tid = LockSupport.getThreadId(Thread.currentThread());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    HoldCounter 是一个计数器,count 用来记录当前线程拥有读锁的数量,即读锁的重入次数;tid 用来记录当前线程唯一 ID 。
    Sync 有一个 cachedHoldCounter 属性,用来做缓存效果,避免每次都通过 ThreadLocal 去读取数据。]

    static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6


    ThreadLocalHoldCounter 重写了 ThreadLocal 的 initialValue() ,在 ThreadLocal 没有进行过 set 数据的情况下,默认读取到的值都来自于这个方法,也就是配合 ThreadLocal 使用,默认值返回一个新的 HoldCounter 实例。
    在 Sync 中,有一个属性 readHolds ,它的类型是 ThreadLocalHoldCounter ,用来做当前线程读锁重入计数器的 ThreadLocal 包装,便于线程读取自己的读锁重入计数器。

    再看看Sync的核心属性:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	// 高16位为读锁,低16位为写锁
        static final int SHARED_SHIFT   = 16;
        // 读锁单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  // 1 * 2^16 = 65536
        // 读锁最大数量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  // 2^16 - 1
        // 写锁最大数量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;   // 2^16 - 1 独占标记
        // 当前线程读锁重入次数。当持有读锁的线程数量下降到0时删除。
        private transient ThreadLocalHoldCounter readHolds;
        // 缓存对象,避免每次都去从 ThreadLocal 查找。
        private transient HoldCounter cachedHoldCounter;
    	// 第一个获取读锁线程
        private transient Thread firstReader;
        // 第一个读锁线程重入读锁的计数
        private transient int firstReaderHoldCount;
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    写锁加锁

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 获取写锁状态
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 重入
            setState(c + acquires);
            return true;
        }
        // 获取写锁
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        // 设置写锁 owner
        setExclusiveOwnerThread(current);
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    写锁释放

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    读锁加锁

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            // 首次获取读锁
            if (r == 0) {
                firstReader = current;
                // 第一个线程重入
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 重入
                firstReaderHoldCount++;
            } else {
                // 后续线程,通过 ThreadLocal 获取重入次数
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    读锁释放

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    总结

    最核心的就是,ReentrantReadWriteLock 的写锁可重入是根据 AQS 中的 state 计数的;读锁的可重入是 Sync 中的 HoldCounter 来记录的。

  • 相关阅读:
    Java中的设计模式:工厂模式
    SpringBoot基础(九)-- 配置文件优先级
    时间复杂度(补充)和 空间复杂度
    性能压测工具 —— wrk
    java毕业生设计校园租赁系统的设计与实现计算机源码+系统+mysql+调试部署+lw
    如何封装安全的go
    含文档+PPT+源码等]精品基于Java的社区团购系统SSM[包运行成功]计算机毕业设计Java毕设
    2022-基于树木形态和有限状态机的森林火灾蔓延实时三维可视化
    RabbitMQ实战宝典:从新手到专家的全面探索
    测试与FastAPI应用数据之间的差异
  • 原文地址:https://blog.csdn.net/h295928126/article/details/126024586