@TOC




不止Samaphore,其他协助类和锁,例如CountDownLatch, ReentrantLock都是内部引入Sync类,这个类继承AQS,因此可以实现AQS的功能












在可重入锁中的使用原理






不同的协作类有不同的实现





public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
将count传入到Sync类进行处理

public long getCount() {
return sync.getCount();
}
最终会调用AQS的获取状态方法

public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 倒数到0
doReleaseShared(); // 将等待的线程全部唤醒
return true;
}
return false;
}
tryReleaseShared的实现
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) { // CAS自旋
int c = getState();
if (c == 0) // 已经释放过,无需再释放
return false;
int nextc = c-1; // 减1
if (compareAndSetState(c, nextc)) // cas更新
return nextc == 0; // 倒数完了,返回true
}
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//包装的node节点放线程
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 将线程挂起, 进行阻塞
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
tryAcquireSharedpublic final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 剩下的许可证-要获取的数量,负数,表示不能获取,需要等待
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared方法的实现在Semaphore中的实现有公平和非公平两种
非公平
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // cas自旋
int available = getState(); // 获取当前剩余许可证的数量
int remaining = available - acquires;// 为负数,表示获取不到
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 返回true,锁被释放
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒线程
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 状态减1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 状态为0时,才释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
state是不是为0, 然后判断当前线程是否持有锁,如果持有,可以重入,如果都不是,当前线程不能拿到锁,只能放到等待队列中public void lock() {
sync.lock();
}
abstract void lock();
final void lock() {
if (compareAndSetState(0, 1)) // cas判断
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 上不了锁,会执行此方法
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 放到队列
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 被其他线程持有,不能上锁
}