在未查看过lock锁源码可以先观看lock锁源码先lock底层源码
Synchronized方法内调用wait、notify、notifyAll思路
public static void main(String[] args) throws InterruptedException {
Object a = new Object();
new Thread(() -> {
synchronized (a) {
System.out.println("线程C开始");
try {
a.wait();
System.out.println("线程C结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(0);
new Thread(() -> {
synchronized (a) {
System.out.println("线程A开始");
a.notify();
System.out.println("线程A结束");
}
}).start();
Thread.sleep(0);
new Thread(() -> {
synchronized (a) {
System.out.println("线程b开始");
}
}).start();
// 线程C开始
// 线程A开始
// 线程A结束
// 线程C结束
// 线程b开始
}
Condition的原理
通用的wait方法和notify方法在lock锁里面调用后是不会释放锁资源的,它只在Synchronized方法内会释放锁。
public class ConditionDemeNotify implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemeNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemeNotify");
lock.lock(); //synchronized(lock)
try {
condition.signal(); //让当前线程唤醒 Object.notify(); //因为任何对象都会 有monitor
System.out.println("end - ConditionDemeNotify");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionDemoWait implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
lock.notify();
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
lock.lock();
try {
condition.await(); //让当前线程阻塞,Object.wait();
System.out.println("end - ConditionDemoWait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemeNotify conditionDemeNotify = new ConditionDemeNotify(lock,condition);
ConditionDemoWait conditionDemoWait = new ConditionDemoWait(lock,condition);
new Thread(conditionDemoWait).start();
new Thread(conditionDemeNotify).start();
}
Condition.wait思路
1、添加进AQS等待队列当中
2、释放锁
3、阻塞
4、唤醒后,重新抢锁
5、处理interupt()的中断响应
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前方法是因为抢锁状态所以不需要CAS操作进行加锁操作
Node node = addConditionWaiter(); // 添加进AQS等待队列当中
int savedState = fullyRelease(node); // 完整的释放锁(考虑重入问题)
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 校验是否需要等待
LockSupport.park(this); // 休眠等待唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 抢锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 添加进AQS等待队列当中
private Node addConditionWaiter() {
Node t = lastWaiter; // 条件队列的最后一个节点
if (t != null && t.waitStatus != Node.CONDITION) { // 如果最后一个Node节点不是等待队列状态-2
unlinkCancelledWaiters(); //
t = lastWaiter;
}
// 将当前线程添加进AQS等待队列当中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* 从条件队列中取消已取消服务节点的链接。仅在持有锁时调用。当条件等待期间发生取消时,以及当lastWaiter被取消时插入新的waiter时,
* 将调用此函数。在没有信号的情况下,需要使用这种方法来避免垃圾保留。因此,即使它可能需要完整的遍历,它也只在没有信号的情况下发生超时或取消时才会发挥作用。
* 它遍历所有节点,而不是停在一个特定的目标,以解除所有指向垃圾节点的指针的链接,而不需要在取消风暴期间多次重新遍历
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 完整的释放锁(考虑重入问题)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); // 获取当前线程重入次数
if (release(savedState)) { // 释放锁资源
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 校验是否需要休眠
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) // 当前线程node节点如果已经移除了AQS等待队列当中就不需要等待休眠等待
return false;
if (node.next != null) // 如果有后继,它必须在队列中
return true;
return findNodeFromTail(node);
}
// 如果当前node节点在AQS等待队列当中就返回true 休眠等待
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
Condition.signal思路
1、要把被阻塞的线程,先唤醒(signal、signalAll)
2、把等待队列中被唤醒的线程转移到AQS队列中
// 唤醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) // 如果AQS队列当中第一个node
doSignal(first);
}
// 删除和传输节点,直到到达非取消的1或null。从signal中分离出来 唤醒等待队列中的一个线程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) //
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 将等待AQS队列当中node节点迁移到同步队列当中
(first = firstWaiter) != null);
}
// 将节点从条件队列传输到同步队列。如果成功返回true ,如果成功传输,则为True(否则在信号之前取消节点)
final boolean transferForSignal(Node node) {
// 修改node状态为同步等待状态0,如果失败着返回错误
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 迁移同步队列当中
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒
return true;
}
Condition的实际应用
1、实现阻塞队列(业务组件)
2、在线程池中会用到阻塞队列
3、生产者消费者
4、流量缓冲
J.U.C 中的阻塞队列
1、ArrayBlockingQueue 基于数组结构
2、LinkedBlockingQueue 基于链表结构
3、PriorityBlcokingQueue 基于优先级队列
4、DelayQueue 允许延时执行的队列
5、SynchronousQueue 没有任何存储结构的的队列