• 阻塞队列--线程安全问题


    之前的队列在很多场景下都不能很好地工作,例如

    1. 大部分场景要求分离向队列放入(生产者:主要调用offer方法)、从队列拿出(消费者:主要调用poll方法)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题

    2. 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试

    3. 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试

    因此我们需要解决的问题有

    1. 用锁保证线程安全

    2. 用条件变量让等待非空线程等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转

    有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)

    1. public class TestThreadUnsafe {
    2. private final String[] array = new String[10];
    3. private int tail = 0;
    4. public void offer(String e) {
    5. array[tail] = e;
    6. tail++;
    7. }
    8. @Override
    9. public String toString() {
    10. return Arrays.toString(array);
    11. }
    12. public static void main(String[] args) {
    13. TestThreadUnsafe queue = new TestThreadUnsafe();
    14. new Thread(()-> queue.offer("e1"), "t1").start();
    15. new Thread(()-> queue.offer("e2"), "t2").start();
    16. }
    17. }

    注意IDEA调试设置:

    执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错

    糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果

    单锁实现

    Java 中要防止代码段交错执行,需要使用锁,有两种选择

    • synchronized 代码块,属于关键字级别提供锁保护,功能少

    • ReentrantLock(可重入锁) 类,功能丰富

    以 ReentrantLock 为例

    1. ReentrantLock lock = new ReentrantLock();//锁对象
    2. public void offer(String e) {
    3. //lock.lock(); //加锁
    4. lock.lockInterruptibly();//加锁 记得要抛异常
    5. try {
    6. array[tail] = e;
    7. tail++;
    8. } finally {
    9. //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
    10. lock.unlock();//解锁
    11. }
    12. }

    lock()与lockInterruptibly()的区别:

    使用lock():当t1加锁状态,未解锁时,t2必须等待

    使用lockInterruptibly():当t1加锁状态,未解锁时,但t1处于异常导致无法解锁的时候,t2可以在阻塞状态下随时打断t1的锁

    只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一

    • 另一种情况是线程2 先获得锁,线程1 被挡在外面

    • 要明白保护的本质,本例中是保护的是 tail 位置读写的安全

    事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全

    1. ReentrantLock lock = new ReentrantLock();
    2. int size = 0;//判断队列是否满了
    3. public void offer(String e) {
    4. lock.lockInterruptibly();
    5. try {
    6. if(isFull()) {
    7. // 满了怎么办?
    8. }
    9. array[tail] = e;
    10. //tail++;
    11. //防止tail长度溢出
    12. if (++tail == array.length) {
    13. tail = 0;
    14. }
    15. size++;
    16. } finally {
    17. lock.unlock();
    18. }
    19. }
    20. private boolean isFull() {
    21. return size == array.length;
    22. }

    之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:

    • 在队列满时,不是立刻返回,而是当前线程进入等待

    • 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行

    ReentrantLock 可以配合条件变量来实现,代码进化为

    1. ReentrantLock lock = new ReentrantLock();
    2. Condition tailWaits = lock.newCondition(); // 条件变量
    3. int size = 0;
    4. public void offer(String e) {
    5. lock.lockInterruptibly();
    6. try {
    7. while (isFull()) {
    8. tailWaits.await(); // 当队列满时, 当前线程进入 tailWaits 等待,当前线程加入tailWaits,并且让此线程阻塞
    9. }
    10. array[tail] = e;
    11. //tail++;
    12. //防止tail长度溢出
    13. if (++tail == array.length) {
    14. tail = 0;
    15. }
    16. size++;
    17. } finally {
    18. //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
    19. lock.unlock();
    20. }
    21. }
    22. private boolean isFull() {
    23. return size == array.length;
    24. }
    • 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁

    • 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行

    思考为何要用 while 而不是 if,设队列容量是 3

    关键点:

    • 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化

    • 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待

    最后的实现代码:

    接口:

    1. /**
    2. 目前队列存在的问题
      1. 很多场景要求分离生产者、消费者两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
    3. 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
  • 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
  • 解决方法
    1. 用锁保证线程安全
    2. 用条件变量让 poll 或 offer 线程进入等待状态,而不是不断循环尝试,让 CPU 空转
    3. */
    4. public interface BlockingQueue { // 阻塞队列
    5. void offer(E e) throws InterruptedException;
    6. boolean offer(E e, long timeout) throws InterruptedException;
    7. E poll() throws InterruptedException;
    8. }
    9. 实现:

      1. /**
      2. * 单锁实现
      3. * @param 元素类型
      4. */
      5. public class BlockingQueue1 implements BlockingQueue {
      6. private final E[] array;
      7. private int head = 0;
      8. private int tail = 0;
      9. private int size = 0; // 元素个数 判断队列是否满了
      10. @SuppressWarnings("all")
      11. public BlockingQueue1(int capacity) {
      12. array = (E[]) new Object[capacity];
      13. }
      14. ReentrantLock lock = new ReentrantLock();
      15. Condition tailWaits = lock.newCondition();//用于poll方法中
      16. Condition headWaits = lock.newCondition();//用于offer方法中
      17. @Override
      18. public void offer(E e) throws InterruptedException {
      19. lock.lockInterruptibly();
      20. try {
      21. while (isFull()) {
      22. tailWaits.await();
      23. }
      24. array[tail] = e;
      25. //tail++;
      26. //防止tail长度溢出
      27. if (++tail == array.length) {
      28. tail = 0;
      29. }
      30. size++;
      31. //poll方法那边等待队列非空,那么我们就要唤醒poll方法的等待
      32. headWaits.signal();
      33. } finally {
      34. //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
      35. lock.unlock();
      36. }
      37. }
      38. @Override
      39. //对于上面的Offer方法,我们又多了个选择:
      40. //我们确实可以等待该线程被唤醒,但是要等多久呢?如果我们不需要等那么久,那就可以给这个等待加一个时间上限
      41. //在规定时间内,我可以等待,但是超过了这个时间,还没有人来唤醒我,那我就不等了,直接提取结束这个等待,可以返回一个false,表示我添加失败
      42. public void offer(E e, long timeout) throws InterruptedException {
      43. lock.lockInterruptibly();
      44. try {
      45. long t = TimeUnit.MILLISECONDS.toNanos(timeout);
      46. while (isFull()) {
      47. //在规定时间内,我可以等待,但是超过了这个时间,还没有人来唤醒我,那我就不等了,直接提取结束这个等待,可以返回一个false,表示我添加失败
      48. if (t <= 0) {
      49. return;
      50. }
      51. t = tailWaits.awaitNanos(t);//最多等待多少纳秒
      52. }
      53. array[tail] = e;
      54. //tail++;
      55. //防止tail长度溢出
      56. if (++tail == array.length) {
      57. tail = 0;
      58. }
      59. size++;
      60. headWaits.signal();
      61. } finally {
      62. //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
      63. lock.unlock();
      64. }
      65. }
      66. @Override
      67. public E poll() throws InterruptedException {
      68. lock.lockInterruptibly();
      69. try {
      70. //如果队列为空,我们就要让该线程等待,直到offer方法中headWaits.signal();将其唤醒
      71. while (isEmpty()) {
      72. headWaits.await();
      73. }
      74. E e = array[head];
      75. array[head] = null; // help GC
      76. if (++head == array.length) {
      77. head = 0;
      78. }
      79. size--;
      80. //offer方法那边等待队列非空,那么我们就要唤醒offer方法的等待
      81. tailWaits.signal();
      82. return e;
      83. } finally {
      84. //利用try-finally:为了防止当我们try{}内有异常,那么我们的lock.unlock()解锁操作就没办法执行,所以我们要保证即使出现异常也要执行解锁操作
      85. lock.unlock();
      86. }
      87. }
      88. private boolean isEmpty() {
      89. return size == 0;
      90. }
      91. private boolean isFull() {
      92. return size == array.length;
      93. }
      94. }

      注意

      双锁实现

      单锁的缺点在于:

      如果希望进一步提高性能,可以用两把锁

      1. ReentrantLock headLock = new ReentrantLock(); // 保护 head 的锁
      2. Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合
      3. ReentrantLock tailLock = new ReentrantLock(); // 保护 tail 的锁
      4. Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合

      先看看 offer 方法的初步实现

      1. @Override
      2. public void offer(E e) throws InterruptedException {
      3. tailLock.lockInterruptibly();
      4. try {
      5. // 队列满则等待
      6. while (isFull()) {
      7. tailWaits.await();
      8. }
      9. // 不满则入队
      10. array[tail] = e;
      11. if (++tail == array.length) {
      12. tail = 0;
      13. }
      14. // 修改 size (有问题)
      15. size++;
      16. } finally {
      17. tailLock.unlock();
      18. }
      19. }

      上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性

      此时,你会有疑问:size++,它不就是写在一个锁对象的加锁与解锁操作之间吗?那它怎么就是线程不安全的呢?

      答:注意:我们这个size++,它确实能够受到我们tailLock这把锁的保护,但是对所有使用tailLock锁的线程让他们串行执行,也就是将来调用所有offer方法的线程,它们是不是都在使用tailLock这把锁,也就是说,只有各种线程在调用offer方法时,size++会受到保护,但是如果是headLock 这个锁,这个锁是由poll方法调用,这就和“只有各种线程在调用offer方法时,size++会受到保护”同一个道理,但是如果这两个锁串着用size++就不受保护,因为现在是使用了两把锁。

      1. AtomicInteger size = new AtomicInteger(0); // 保护 size 的原子变量
      2. size.getAndIncrement(); // 自增
      3. size.getAndDecrement(); // 自减

      代码修改为

      1. @Override
      2. public void offer(E e) throws InterruptedException {
      3. tailLock.lockInterruptibly();
      4. try {
      5. // 队列满等待
      6. while (isFull()) {
      7. tailWaits.await();
      8. }
      9. // 不满则入队
      10. array[tail] = e;
      11. if (++tail == array.length) {
      12. tail = 0;
      13. }
      14. // 修改 size
      15. size.getAndIncrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
      16. } finally {
      17. tailLock.unlock();
      18. }
      19. }

      对称地,可以写出 poll 方法

      1. @Override
      2. public E poll() throws InterruptedException {
      3. E e;
      4. headLock.lockInterruptibly();
      5. try {
      6. // 队列空等待
      7. while (isEmpty()) {
      8. headWaits.await();
      9. }
      10. // 不空则出队
      11. e = array[head];
      12. if (++head == array.length) {
      13. head = 0;
      14. }
      15. // 修改 size
      16. size.getAndDecrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
      17. } finally {
      18. headLock.unlock();
      19. }
      20. return e;
      21. }

      下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为

      1. @Override
      2. public E poll() throws InterruptedException {
      3. E e;
      4. headLock.lockInterruptibly();
      5. try {
      6. // 队列空等待
      7. while (isEmpty()) {
      8. headWaits.await();
      9. }
      10. // 不空则出队
      11. e = array[head];
      12. if (++head == array.length) {
      13. head = 0;
      14. }
      15. // 修改 size
      16. size.getAndDecrement();
      17. // 通知 tailWaits 不满(有问题)
      18. tailWaits.signal();
      19. } finally {
      20. headLock.unlock();
      21. }
      22. return e;
      23. }

      那有同学说,加上锁不就行了吗,于是写出了下面的代码

      发现什么问题了?两把锁这么嵌套使用,非常容易出现死锁,如下所示

      因此得避免嵌套,两段加锁的代码变成了下面平级的样子

      性能还可以进一步提升

      1. 代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?

      这个技巧被称之为级联通知(cascading notifies),类似的原因:

           在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒

      最终的代码为

      1. public class BlockingQueue2 implements BlockingQueue {
      2. private final E[] array;
      3. private int head = 0;
      4. private int tail = 0;
      5. private final AtomicInteger size = new AtomicInteger(0);
      6. ReentrantLock headLock = new ReentrantLock();
      7. Condition headWaits = headLock.newCondition();
      8. ReentrantLock tailLock = new ReentrantLock();
      9. Condition tailWaits = tailLock.newCondition();
      10. public BlockingQueue2(int capacity) {
      11. this.array = (E[]) new Object[capacity];
      12. }
      13. @Override
      14. public void offer(E e) throws InterruptedException {
      15. int c;
      16. tailLock.lockInterruptibly();
      17. try {
      18. while (isFull()) {
      19. tailWaits.await();
      20. }
      21. array[tail] = e;
      22. if (++tail == array.length) {
      23. tail = 0;
      24. }
      25. c = size.getAndIncrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
      26. // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
      27. if (c + 1 < array.length) {
      28. tailWaits.signal();
      29. }
      30. } finally {
      31. tailLock.unlock();
      32. }
      33. // b. 从0->不空, 由此offer线程唤醒等待的poll线程
      34. if (c == 0) {
      35. headLock.lock();
      36. try {
      37. headWaits.signal();
      38. } finally {
      39. headLock.unlock();
      40. }
      41. }
      42. }
      43. @Override
      44. public E poll() throws InterruptedException {
      45. E e;
      46. int c;
      47. headLock.lockInterruptibly();
      48. try {
      49. while (isEmpty()) {
      50. headWaits.await();
      51. }
      52. e = array[head];
      53. if (++head == array.length) {
      54. head = 0;
      55. }
      56. c = size.getAndDecrement();//相当于size++ 可以防止不会与其他线程产生这种指令交错的问题
      57. // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
      58. if (c > 1) {
      59. headWaits.signal();
      60. }
      61. } finally {
      62. headLock.unlock();
      63. }
      64. // a. 从满->不满, 由此poll线程唤醒等待的offer线程
      65. if (c == array.length) {
      66. tailLock.lock();
      67. try {
      68. tailWaits.signal();
      69. } finally {
      70. tailLock.unlock();
      71. }
      72. }
      73. return e;
      74. }
      75. private boolean isEmpty() {
      76. return size.get() == 0;
      77. }
      78. private boolean isFull() {
      79. return size.get() == array.length;
      80. }
      81. }

      双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码

    10. 相关阅读:
      9、osg的texture转换为虚幻引擎的UTexture2D
      罗素《为什么我不是基督徒》的后果
      神经网络结构图如何看懂,神经网络结果图如何看
      ChiQA
      exec函数族的应用
      主定理(简化版)
      【云原生】云端数据之液位模拟量全方位诠释
      【2022】【论文笔记】基于Rydberg原子Antenna的——
      【KingbaseES】银河麒麟V10 ARM64架构_安装人大金仓数据库KingbaseES_V8R6(CentOS8)
      项目day(3) 前台环境搭建
    11. 原文地址:https://blog.csdn.net/m0_60333804/article/details/133564209