synchronized关键字虽然可以解决大部分多线程锁的问题,但是仍旧存在下述问题:
针对上述问题,Doug Lea李大爷实现了一套更加灵活的Java锁机制,即java.concurrent.locks包。其中AQS在前面的章节已经解析过了,就不再这里重复。
Lock接口有下述6个方法,主要分为三大类:
synchronsized关键字不需要用户手动释放锁,当synchronized修饰的方法或代码块执行完毕后,系统会自动让线程释放对锁的占用。
与synchronsized关键字不同的是,Lock必须由用户手动执行加锁/释放锁操作,当持有锁的线程发生异常时,该线程不会自动释放锁,可能会导致死锁,故Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。
// 初始化锁对象
Lock lock = ...;
// 加锁操作
lock.lock();
try{
// 执行相应任务
doSomething();
}catch(Exception e){
// 处理异常
}finally{
// 释放锁
lock.unlock();
}
Lock接口的实现类主要有5个,但只有一个是非内部类,也就是常用的ReentrantLcok。
除了Lock接口,还有做了读写锁的ReadWriteLock接口,ReadWriteLock 中定义了获取两种锁的方式,一个用于获取读锁、一个用于获取写锁。只要没有持有写锁的线程在执行,读锁可以同时被多个尝试读操作的线程持有,而写锁是排他锁。相应的实现类为ReentrantReadWriteLock。
以及还有一个StampedLock,是比ReentrantReadWriteLock更快的一种锁,支持乐观读、悲观读锁和写锁。和ReentrantReadWriteLock不同的是,StampedLock支持多个线程申请乐观读的同时,还允许一个线程申请写锁。
一般来说,这种乐观锁的性能要比其他锁快几倍,而且随着线程数量的不断增加,性能的差距会越来越大。
简而言之,在大量并发的场景中StampedLock的性能是碾压重入锁和读写锁的。但毕竟,世界上没有十全十美的东西,StampedLock也并非全能,它的缺点如下:
所以为了不给自己填坑,一般代码里也不会用到StampedLock。。
首先看它的特性:
ReentrantLock是基于AQS去实现一系列的功能的,但是他不是直接去继承实现,而是让成员变量Sync去完成这些操作。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
}
}
它怎么实现可重入锁的呢?其实很简单,因为AQS继承了AbstractOwnableSynchronizer,这个类可以维护一个当前的线程作为主线程,那么ReentrantLock在获取锁时会存储当前线程为拥有线程
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 存储当前线程为拥有线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程是拥有线程,则state叠加
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
释放锁时判断下当前操作的线程是否与主线程相等就可以完成AQS的状态的更改,也就实现了可重入
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
看构造函数
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
看看非公平锁的流程
final void lock() {
// 直接尝试加锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果获取锁失败进入 AQS acquire 逻辑
acquire(1);
}
public final void acquire(int arg) {
// tryAcquire(arg) 尝试获取锁
// acquireQueued 获取锁失败进行等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑(看方法头注释):
// 非公平锁的逻辑
// 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行
// 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 无锁状态, 尝试竞争
if (c == 0) {
if (compareAndSetState(0, acquires)) { //是否获取到锁
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程持有锁, state 计数 +1
else if (current == getExclusiveOwnerThread()) { //判断是否是重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 会去判断是否有队列,没有队列直接cas获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。
公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量。
如果是使用lockInterruptibly进行获取锁,则可以去判断当前线程是否已经中断而决定抛出异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
底层还是由AQS去实现锁超时。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
在Sync里有这么一个对象,在CycleBarrier就是拿这个实现的。
final ConditionObject newCondition() {
return new ConditionObject();
}
Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。
Condition 是个接口,基本的方法就是 await() 和 signal() 方法:
他有一系列的成员变量
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
}
核心还是由Sync来实现,看他的成员类
static final class HoldCounter {
int count; // initially 0
// Use id, not reference, to avoid garbage retention
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
HoldCounter 是一个计数器,count 用来记录当前线程拥有读锁的数量,即读锁的重入次数;tid 用来记录当前线程唯一 ID 。
Sync 有一个 cachedHoldCounter 属性,用来做缓存效果,避免每次都通过 ThreadLocal 去读取数据。]
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
ThreadLocalHoldCounter 重写了 ThreadLocal 的 initialValue() ,在 ThreadLocal 没有进行过 set 数据的情况下,默认读取到的值都来自于这个方法,也就是配合 ThreadLocal 使用,默认值返回一个新的 HoldCounter 实例。
在 Sync 中,有一个属性 readHolds ,它的类型是 ThreadLocalHoldCounter ,用来做当前线程读锁重入计数器的 ThreadLocal 包装,便于线程读取自己的读锁重入计数器。
再看看Sync的核心属性:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 高16位为读锁,低16位为写锁
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1 * 2^16 = 65536
// 读锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 2^16 - 1
// 写锁最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 2^16 - 1 独占标记
// 当前线程读锁重入次数。当持有读锁的线程数量下降到0时删除。
private transient ThreadLocalHoldCounter readHolds;
// 缓存对象,避免每次都去从 ThreadLocal 查找。
private transient HoldCounter cachedHoldCounter;
// 第一个获取读锁线程
private transient Thread firstReader;
// 第一个读锁线程重入读锁的计数
private transient int firstReaderHoldCount;
// ...
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获取写锁状态
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 重入
setState(c + acquires);
return true;
}
// 获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置写锁 owner
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 首次获取读锁
if (r == 0) {
firstReader = current;
// 第一个线程重入
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入
firstReaderHoldCount++;
} else {
// 后续线程,通过 ThreadLocal 获取重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
最核心的就是,ReentrantReadWriteLock 的写锁可重入是根据 AQS 中的 state 计数的;读锁的可重入是 Sync 中的 HoldCounter 来记录的。