(1)参考书籍:
《Java 并发编程的艺术》
(2)相关知识点:
Java 并发编程面试题——Lock 与 AbstractQueuedSynchronizer (AQS)
Java 并发编程面试题——重入锁 ReentrantLock
(1)Condition 接口是 Java 并发包中的一部分,用于在支持锁的基础上实现更高级的线程同步和协作。它提供了比内置监视器锁更精细的线程通信和条件等待的机制。
(2)Condition 是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个 Lock 对象中可以创建多个 Condition 实例(即对象监视器),线程对象可以注册在指定的 Condition 中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。
(3)在使用 notify()/notifyAll() 方法进行通知时,被通知的线程是由 JVM 选择的,用 ReentrantLock 类结合 Condition 实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。而 synchronized 关键字就相当于整个 Lock 对象中只有一个 Condition 实例,所有的线程都注册在它一个身上。如果执行 notifyAll() 方法的话就会通知所有处于等待状态的线程,这样会造成很大的效率问题。而 Condition 实例的 signalAll() 方法,只会唤醒注册在该 Condition 实例中的所有等待线程。
(4)ConditionObject
类实现了 Condition
接口,同时它是 AQS 的内部类(具体关系如下图所示),因为 Condition 的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个 Condition 对象都包含着一个队列(以下称为等待队列),该队列是 Condition 对象实现等待/通知功能的关键。
(1)Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition
对象关联的锁。Condition 对象是由 Lock 对象(调用 Lock 对象的 newCondition()
方法)创建出来的。Condition 的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如代码如下所示:
Lock lock = new ReentrantLock();
//创建 Condition 对象
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
(2)如示例所示,一般都会将 Condition 对象作为成员变量。当调用 await()
方法后,当前线程会释放锁并在此等待,而其他线程调用 Condition 对象的 signal()
方法,通知当前线程后,当前线程才从 await()
方法返回,并且在返回前已经获取了锁。
(3)下面给出一个更加具体的例子:
@Slf4j(topic = "c.ConditionDemo")
public class ConditionDemo {
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
TimeUnit.SECONDS.sleep(1);
sendBreakfast();
TimeUnit.SECONDS.sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
}
输出结果如下所示:
21:08:39 [main] c.Test23 - 送早餐来了
21:08:39 [Thread-1] c.Test23 - 等到了它的早餐
21:08:40 [main] c.Test23 - 送烟来了
21:08:40 [Thread-0] c.Test23 - 等到了它的烟
上面的代码是一个简单的示例,模拟了一个等待送烟和早餐并通知的场景。通过使用 ReentrantLock 和 Condition 来实现线程之间的等待和通知。具体来说,代码中创建了一个 ReentrantLock 对象 lock,并通过 lock.newCondition() 方法创建了两个 Condition 对象 waitCigaretteQueue
和 waitBreakfastQueue
。
在主线程中启动了两个线程,一个线程等待烟的到来,另一个线程等待早餐的到来。在每个线程中,首先通过 lock.lock() 获取锁,然后使用 while 循环来判断是否满足等待条件(hasCigarette 和 hasBreakfast)。如果不满足条件,则通过调用相应的 wait 方法(waitCigaretteQueue.await() 和 waitBreakfastQueue.await())来挂起线程,并释放锁。一旦满足条件,线程会继续往下执行。
在主线程中,通过调用 sendCigarette() 和 sendBreakfast() 方法来发送烟和早餐。在每个方法中,首先获取锁(lock.lock()),然后修改相应的状态标志位(hasCigarette 和 hasBreakfast),最后通过调用 signal() 方法来通知等待队列中的线程被唤醒。
整个过程中,使用了 ReentrantLock 来提供互斥访问临界区的能力,而使用 Condition 来实现线程之间的等待和通知。Condition 的 wait 方法实际上是将当前线程挂起,并将其放入等待队列,而 signal 方法则是对等待队列中的线程进行唤醒。
通过这种方式,实现了线程间的同步和通信,使得等待烟和早餐的线程能够在满足条件时得到及时通知并继续执行。
(1)等待队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了 Condition.await()
方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node
。
(2)一个 Condition 包含一个等待队列,Condition 拥有首节点 (firstWaiter) 和尾节点 (lastWaiter)。当前线程调用 Condition.await()
方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下图所示:
如图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter
指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await()
方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
(3)在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示:
如图所示,Condition 的实现是同步器的内部类,因此每个 Condition 实例都能够访问同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用。
(1)调用 Condition 的 await()
方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()
方法返回时,当前线程一定获取了 Condition 相关联的锁。
(2)如果从队列(同步队列和等待队列)的角度看 await()
方法,当调用 await()
方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中。ConditionObject
的 await()
方法的代码如下所示:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public class ConditionObject implements Condition, java.io.Serializable {
//...
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
}
(3)调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
(4)当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用 Condition.signal()
方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException
。如果从队列的角度去看,当前线程加入 Condition 的等待队列,该过程如下图所示:
如图所示,同步队列的首节点并不会直接加入等待队列,而是通过 addConditionWaiter()
方法把当前线程构造成一个新的节点并将其加入等待队列中。
(1)调用 Condition 的 signal()
方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。ConditionObject 的 signal()
方法的代码如下所示:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public class ConditionObject implements Condition, java.io.Serializable {
//...
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
}
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到 signal()
方法进行了 isHeldExclusively()
检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport
唤醒节点中的线程。节点从等待队列移动到同步队列的过程如下图所示:
(3)通过调用同步器的 enq(Node node)
方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。被唤醒后的线程,将从 await()
方法中的 while 循环中退出(isOnSyncQueue(Node node)
方法返回 true,节点已经在同步队列中),进而调用同步器的 acquireQueued()
方法加入到获取同步状态的竞争中。
(4)成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的 await()
方法返回,此时该线程已经成功地获取了锁。Condition的 signalAll()
方法,相当于对等待队列中的每个节点均执行一次 signal()
方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。