之前的队列在很多场景下都不能很好地工作,例如
大部分场景要求分离向队列放入(生产者:主要调用offer方法)、从队列拿出(消费者:主要调用poll方法)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
因此我们需要解决的问题有
用锁保证线程安全
用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转
有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)
- public class TestThreadUnsafe {
- private final String[] array = new String[10];
- private int tail = 0;
-
- public void offer(String e) {
- array[tail] = e;
- tail++;
- }
-
- @Override
- public String toString() {
- return Arrays.toString(array);
- }
-
- public static void main(String[] args) {
- TestThreadUnsafe queue = new TestThreadUnsafe();
- new Thread(()-> queue.offer("e1"), "t1").start();
- new Thread(()-> queue.offer("e2"), "t2").start();
- }
- }
注意IDEA调试设置:
执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错
糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果
Java 中要防止代码段交错执行,需要使用锁,有两种选择
synchronized 代码块,属于关键字级别提供锁保护,功能少
ReentrantLock(可重入锁) 类,功能丰富
以 ReentrantLock 为例
- ReentrantLock lock = new ReentrantLock();//锁对象
-
- public void offer(String e) {
- //lock.lock(); //加锁
- lock.lockInterruptibly();//加锁 记得要抛异常
- try {
- array[tail] = e;
- tail++;
- } finally {
- //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
- lock.unlock();//解锁
- }
- }
lock()与lockInterruptibly()的区别:
使用lock():当t1加锁状态,未解锁时,t2必须等待
使用lockInterruptibly():当t1加锁状态,未解锁时,但t1处于异常导致无法解锁的时候,t2可以在阻塞状态下随时打断t1的锁
只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一
另一种情况是线程2 先获得锁,线程1 被挡在外面
要明白保护的本质,本例中是保护的是 tail 位置读写的安全
事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)
- ReentrantLock lock = new ReentrantLock();
- int size = 0;//判断队列是否满了
-
- public void offer(String e) {
- lock.lockInterruptibly();
- try {
- if(isFull()) {
- // 满了怎么办?
- }
- array[tail] = e;
- //tail++;
- //防止tail长度溢出
- if (++tail == array.length) {
- tail = 0;
- }
- size++;
- } finally {
- lock.unlock();
- }
- }
-
- private boolean isFull() {
- return size == array.length;
- }
之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:
在队列满时,不是立刻返回,而是当前线程进入等待
什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行
ReentrantLock 可以配合条件变量来实现,代码进化为
- ReentrantLock lock = new ReentrantLock();
- Condition tailWaits = lock.newCondition(); // 条件变量
- int size = 0;
-
- public void offer(String e) {
- lock.lockInterruptibly();
- try {
- while (isFull()) {
- tailWaits.await(); // 当队列满时, 当前线程进入 tailWaits 等待,当前线程加入tailWaits,并且让此线程阻塞
- }
- array[tail] = e;
- //tail++;
- //防止tail长度溢出
- if (++tail == array.length) {
- tail = 0;
- }
- size++;
- } finally {
- //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
- lock.unlock();
- }
- }
-
- private boolean isFull() {
- return size == array.length;
- }
条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
思考为何要用 while 而不是 if,设队列容量是 3
关键点:
从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待
最后的实现代码:
接口:
- /**
- 目前队列存在的问题
-
-
- 很多场景要求分离生产者、消费者两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
-
- 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
-
- 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
-
- 解决方法
-
-
- 用锁保证线程安全
-
- 用条件变量让 poll 或 offer 线程进入等待状态,而不是不断循环尝试,让 CPU 空转
-
- */
-
- public interface BlockingQueue
{ // 阻塞队列 -
- void offer(E e) throws InterruptedException;
-
- boolean offer(E e, long timeout) throws InterruptedException;
-
- E poll() throws InterruptedException;
- }
实现:
- /**
- * 单锁实现
- * @param
元素类型 - */
- public class BlockingQueue1
implements BlockingQueue { - private final E[] array;
- private int head = 0;
- private int tail = 0;
- private int size = 0; // 元素个数 判断队列是否满了
-
- @SuppressWarnings("all")
- public BlockingQueue1(int capacity) {
- array = (E[]) new Object[capacity];
- }
-
- ReentrantLock lock = new ReentrantLock();
- Condition tailWaits = lock.newCondition();//用于poll方法中
- Condition headWaits = lock.newCondition();//用于offer方法中
-
- @Override
- public void offer(E e) throws InterruptedException {
- lock.lockInterruptibly();
- try {
- while (isFull()) {
- tailWaits.await();
- }
- array[tail] = e;
- //tail++;
- //防止tail长度溢出
- if (++tail == array.length) {
- tail = 0;
- }
- size++;
- //poll方法那边等待队列非空,那么我们就要唤醒poll方法的等待
- headWaits.signal();
- } finally {
- //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
- lock.unlock();
- }
- }
-
- @Override
- //对于上面的Offer方法,我们又多了个选择:
- //我们确实可以等待该线程被唤醒,但是要等多久呢?如果我们不需要等那么久,那就可以给这个等待加一个时间上限
- //在规定时间内,我可以等待,但是超过了这个时间,还没有人来唤醒我,那我就不等了,直接提取结束这个等待,可以返回一个false,表示我添加失败
- public void offer(E e, long timeout) throws InterruptedException {
- lock.lockInterruptibly();
- try {
- long t = TimeUnit.MILLISECONDS.toNanos(timeout);
-
- while (isFull()) {
- //在规定时间内,我可以等待,但是超过了这个时间,还没有人来唤醒我,那我就不等了,直接提取结束这个等待,可以返回一个false,表示我添加失败
- if (t <= 0) {
- return;
- }
- t = tailWaits.awaitNanos(t);//最多等待多少纳秒
- }
- array[tail] = e;
- //tail++;
- //防止tail长度溢出
- if (++tail == array.length) {
- tail = 0;
- }
- size++;
- headWaits.signal();
- } finally {
- //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
- lock.unlock();
- }
- }
-
- @Override
- public E poll() throws InterruptedException {
- lock.lockInterruptibly();
- try {
- //如果队列为空,我们就要让该线程等待,直到offer方法中headWaits.signal();将其唤醒
- while (isEmpty()) {
- headWaits.await();
- }
- E e = array[head];
- array[head] = null; // help GC
- if (++head == array.length) {
- head = 0;
- }
- size--;
- //offer方法那边等待队列非空,那么我们就要唤醒offer方法的等待
- tailWaits.signal();
- return e;
- } finally {
- //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
- lock.unlock();
- }
- }
-
- private boolean isEmpty() {
- return size == 0;
- }
-
- private boolean isFull() {
- return size == array.length;
- }
- }
注意
JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
单锁的缺点在于:
生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
冲突的主要是生产者之间:多个 offer 线程修改 tail
冲突的还有消费者之间:多个 poll 线程修改 head
如果希望进一步提高性能,可以用两把锁
一把锁保护 tail
另一把锁保护 head
- ReentrantLock headLock = new ReentrantLock(); // 保护 head 的锁
- Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合
-
- ReentrantLock tailLock = new ReentrantLock(); // 保护 tail 的锁
- Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合
先看看 offer 方法的初步实现
- @Override
- public void offer(E e) throws InterruptedException {
- tailLock.lockInterruptibly();
- try {
- // 队列满则等待
- while (isFull()) {
- tailWaits.await();
- }
-
- // 不满则入队
- array[tail] = e;
- if (++tail == array.length) {
- tail = 0;
- }
-
- // 修改 size (有问题)
- size++;
-
- } finally {
- tailLock.unlock();
- }
- }
上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
此时,你会有疑问:size++,它不就是写在一个锁对象的加锁与解锁操作之间吗?那它怎么就是线程不安全的呢?
答:注意:我们这个size++,它确实能够受到我们tailLock这把锁的保护,但是对所有使用tailLock锁的线程让他们串行执行,也就是将来调用所有offer方法的线程,它们是不是都在使用tailLock这把锁,也就是说,只有各种线程在调用offer方法时,size++会受到保护,但是如果是headLock 这个锁,这个锁是由poll方法调用,这就和“只有各种线程在调用offer方法时,size++会受到保护”同一个道理,但是如果这两个锁串着用size++就不受保护,因为现在是使用了两把锁。
- AtomicInteger size = new AtomicInteger(0); // 保护 size 的原子变量
-
- size.getAndIncrement(); // 自增
- size.getAndDecrement(); // 自减
代码修改为
- @Override
- public void offer(E e) throws InterruptedException {
- tailLock.lockInterruptibly();
- try {
- // 队列满等待
- while (isFull()) {
- tailWaits.await();
- }
-
- // 不满则入队
- array[tail] = e;
- if (++tail == array.length) {
- tail = 0;
- }
-
- // 修改 size
- size.getAndIncrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
-
- } finally {
- tailLock.unlock();
- }
- }
对称地,可以写出 poll 方法
- @Override
- public E poll() throws InterruptedException {
- E e;
- headLock.lockInterruptibly();
- try {
- // 队列空等待
- while (isEmpty()) {
- headWaits.await();
- }
-
- // 不空则出队
- e = array[head];
- if (++head == array.length) {
- head = 0;
- }
-
- // 修改 size
- size.getAndDecrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
-
- } finally {
- headLock.unlock();
- }
- return e;
- }
下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为
- @Override
- public E poll() throws InterruptedException {
- E e;
- headLock.lockInterruptibly();
- try {
- // 队列空等待
- while (isEmpty()) {
- headWaits.await();
- }
-
- // 不空则出队
- e = array[head];
- if (++head == array.length) {
- head = 0;
- }
-
- // 修改 size
- size.getAndDecrement();
-
- // 通知 tailWaits 不满(有问题)
- tailWaits.signal();
-
- } finally {
- headLock.unlock();
- }
- return e;
- }
那有同学说,加上锁不就行了吗,于是写出了下面的代码
发现什么问题了?两把锁这么嵌套使用,非常容易出现死锁,如下所示
因此得避免嵌套,两段加锁的代码变成了下面平级的样子
性能还可以进一步提升
代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?
每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程
这个技巧被称之为级联通知(cascading notifies),类似的原因:
在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒
最终的代码为
- public class BlockingQueue2
implements BlockingQueue { -
- private final E[] array;
- private int head = 0;
- private int tail = 0;
- private final AtomicInteger size = new AtomicInteger(0);
- ReentrantLock headLock = new ReentrantLock();
- Condition headWaits = headLock.newCondition();
- ReentrantLock tailLock = new ReentrantLock();
- Condition tailWaits = tailLock.newCondition();
-
- public BlockingQueue2(int capacity) {
- this.array = (E[]) new Object[capacity];
- }
-
- @Override
- public void offer(E e) throws InterruptedException {
- int c;
- tailLock.lockInterruptibly();
- try {
- while (isFull()) {
- tailWaits.await();
- }
- array[tail] = e;
- if (++tail == array.length) {
- tail = 0;
- }
- c = size.getAndIncrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
- // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
- if (c + 1 < array.length) {
- tailWaits.signal();
- }
- } finally {
- tailLock.unlock();
- }
- // b. 从0->不空, 由此offer线程唤醒等待的poll线程
- if (c == 0) {
- headLock.lock();
- try {
- headWaits.signal();
- } finally {
- headLock.unlock();
- }
- }
- }
-
- @Override
- public E poll() throws InterruptedException {
- E e;
- int c;
- headLock.lockInterruptibly();
- try {
- while (isEmpty()) {
- headWaits.await();
- }
- e = array[head];
- if (++head == array.length) {
- head = 0;
- }
- c = size.getAndDecrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
- // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
- if (c > 1) {
- headWaits.signal();
- }
- } finally {
- headLock.unlock();
- }
- // a. 从满->不满, 由此poll线程唤醒等待的offer线程
- if (c == array.length) {
- tailLock.lock();
- try {
- tailWaits.signal();
- } finally {
- tailLock.unlock();
- }
- }
- return e;
- }
-
- private boolean isEmpty() {
- return size.get() == 0;
- }
-
- private boolean isFull() {
- return size.get() == array.length;
- }
-
- }
双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码