• Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析


    上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。

    但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。

    作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。

    这样特殊的队列,有什么应用场景呢?

    1. SynchronousQueue用法

    先看一个SynchronousQueue的简单用例:

    1. /**
    2. * @author 一灯架构
    3. * @apiNote SynchronousQueue示例
    4. **/
    5. public class SynchronousQueueDemo {
    6. public static void main(String[] args) throws InterruptedException {
    7. // 1. 创建SynchronousQueue队列
    8. BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
    9. // 2. 启动一个线程,往队列中放3个元素
    10. new Thread(() -> {
    11. try {
    12. System.out.println(Thread.currentThread().getName() + " 入队列 1");
    13. synchronousQueue.put(1);
    14. Thread.sleep(1);
    15. System.out.println(Thread.currentThread().getName() + " 入队列 2");
    16. synchronousQueue.put(2);
    17. Thread.sleep(1);
    18. System.out.println(Thread.currentThread().getName() + " 入队列 3");
    19. synchronousQueue.put(3);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }).start();
    24. // 3. 等待1000毫秒
    25. Thread.sleep(1000L);
    26. // 4. 再启动一个线程,从队列中取出3个元素
    27. new Thread(() -> {
    28. try {
    29. System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
    30. Thread.sleep(1);
    31. System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
    32. Thread.sleep(1);
    33. System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
    34. } catch (InterruptedException e) {
    35. e.printStackTrace();
    36. }
    37. }).start();
    38. }
    39. }
    40. 复制代码

    输出结果:

    1. Thread-0 入队列 1
    2. Thread-1 出队列 1
    3. Thread-0 入队列 2
    4. Thread-1 出队列 2
    5. Thread-0 入队列 3
    6. Thread-1 出队列 3
    7. 复制代码

    从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。

    由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:

    为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

    操作抛出异常返回特定值阻塞阻塞一段时间
    放数据addofferputoffer(e, time, unit)
    取数据removepolltakepoll(time, unit)
    查看数据(不删除)element()peek()不支持不支持

    这几组方法的不同之处就是:

    1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
    2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
    3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

    工作中使用最多的就是offer、poll阻塞指定时间的方法。

    2. SynchronousQueue应用场景

    SynchronousQueue的特点:

    队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。

    这种特殊的实现逻辑有什么应用场景呢?

    我的理解就是,如果你希望你的任务需要被快速处理,就可以使用这种队列。

    Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue());
    5. }
    6. 复制代码

    newCachedThreadPool线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。

    如果你使用newCachedThreadPool线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。

    你想想,这处理效率,杠杠滴!

    当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。

    3. SynchronousQueue源码解析

    3.1 SynchronousQueue类属性

    1. public class SynchronousQueue extends AbstractQueue implements BlockingQueue {
    2. // 转换器,取数据和放数据的核心逻辑都在这个类里面
    3. private transient volatile Transferer<E> transferer;
    4. // 默认的构造方法(使用非公平队列)
    5. public SynchronousQueue() {
    6. this(false);
    7. }
    8. // 有参构造方法,可以指定是否使用公平队列
    9. public SynchronousQueue(boolean fair) {
    10. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    11. }
    12. // 转换器实现类
    13. abstract static class Transferer {
    14. abstract E transfer(E e, boolean timed, long nanos);
    15. }
    16. // 基于栈实现的非公平队列
    17. static final class TransferStack extends Transferer {
    18. }
    19. // 基于队列实现的公平队列
    20. static final class TransferQueue extends Transferer {
    21. }
    22. }
    23. 复制代码

    可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。

    1. // 使用非公平队列(基于栈实现)
    2. BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
    3. // 使用公平队列(基于队列实现)
    4. BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);
    5. 复制代码

    本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。

    3.2 栈底层结构

    栈结构,是非公平的,遵循先进后出。

    使用个case测试一下:

    1. /**
    2. * @author 一灯架构
    3. * @apiNote SynchronousQueue示例
    4. **/
    5. public class SynchronousQueueDemo {
    6. public static void main(String[] args) throws InterruptedException {
    7. // 1. 创建SynchronousQueue队列
    8. SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
    9. // 2. 启动一个线程,往队列中放1个元素
    10. new Thread(() -> {
    11. try {
    12. System.out.println(Thread.currentThread().getName() + " 入队列 0");
    13. synchronousQueue.put(0);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. }).start();
    18. // 3. 等待1000毫秒
    19. Thread.sleep(1000L);
    20. // 4. 启动一个线程,往队列中放1个元素
    21. new Thread(() -> {
    22. try {
    23. System.out.println(Thread.currentThread().getName() + " 入队列 1");
    24. synchronousQueue.put(1);
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. }).start();
    29. // 5. 等待1000毫秒
    30. Thread.sleep(1000L);
    31. // 6. 再启动一个线程,从队列中取出1个元素
    32. new Thread(() -> {
    33. try {
    34. System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
    35. } catch (InterruptedException e) {
    36. e.printStackTrace();
    37. }
    38. }).start();
    39. // 7. 等待1000毫秒
    40. Thread.sleep(1000L);
    41. // 8. 再启动一个线程,从队列中取出1个元素
    42. new Thread(() -> {
    43. try {
    44. System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
    45. } catch (InterruptedException e) {
    46. e.printStackTrace();
    47. }
    48. }).start();
    49. }
    50. }
    51. 复制代码

    输出结果:

    1. Thread-0 入队列 0
    2. Thread-1 入队列 1
    3. Thread-2 出队列 1
    4. Thread-3 出队列 0
    5. 复制代码

    从输出结果中可以看出,符合栈结构先进后出的顺序。

    3.3 栈节点源码

    栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:

    1. // 节点
    2. static final class SNode {
    3. // 节点值(取数据的时候,该字段为null
    4. Object item;
    5. // 存取数据的线程
    6. volatile Thread waiter;
    7. // 节点模式
    8. int mode;
    9. // 匹配到的节点
    10. volatile SNode match;
    11. // 后继节点
    12. volatile SNode next;
    13. }
    14. 复制代码
    • item

      节点值,只在存数据的时候用。取数据的时候,这个值是null。

    • waiter

      存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。

    • mode

      节点模式,共有3种类型:

      类型值类型描述类型的作用
      0REQUEST表示取数据
      1DATA表示存数据
      2FULFILLING表示正在等待执行(比如取数据的线程,等待其他线程放数据)

    3.4 put/take流程

    放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。

    先看一下数据流转的过程,方便理解源码。

    还是以上面的case为例:

    1. Thread0先往SynchronousQueue队列中放入元素0
    2. Thread1再往SynchronousQueue队列放入元素1
    3. Thread2从SynchronousQueue队列中取出一个元素

    第一步:Thread0先往SynchronousQueue队列中放入元素0

    把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。

    第二步:Thread1再往SynchronousQueue队列放入元素1

    把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。

    第三步:Thread2从SynchronousQueue队列中取出一个元素

    这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。

    item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。

    然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。

    3.5 put/take源码实现

    先看一下put方法源码:

    1. // 放数据
    2. public void put(E e) throws InterruptedException {
    3. // 不允许放null元素
    4. if (e == null)
    5. throw new NullPointerException();
    6. // 调用转换器实现类,放元素
    7. if (transferer.transfer(e, false, 0) == null) {
    8. // 如果放数据失败,就中断当前线程,并抛出异常
    9. Thread.interrupted();
    10. throw new InterruptedException();
    11. }
    12. }
    13. 复制代码

    核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。

    1. // 取数据和放数据操作,共用一个方法
    2. E transfer(E e, boolean timed, long nanos) {
    3. SNode s = null;
    4. // e为空,说明是取数据,否则是放数据
    5. int mode = (e == null) ? REQUEST : DATA;
    6. for (; ; ) {
    7. SNode h = head;
    8. // 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)
    9. if (h == null || h.mode == mode) {
    10. // 2. 判断节点是否已经超时
    11. if (timed && nanos <= 0) {
    12. // 3. 如果栈顶节点已经被取消,就删除栈顶节点
    13. if (h != null && h.isCancelled())
    14. casHead(h, h.next);
    15. else
    16. return null;
    17. // 4. 把本次操作包装成SNode,压入栈顶
    18. } else if (casHead(h, s = snode(s, e, h, mode))) {
    19. // 5. 挂起当前线程,等待被唤醒
    20. SNode m = awaitFulfill(s, timed, nanos);
    21. // 6. 如果这个节点已经被取消,就删除这个节点
    22. if (m == s) {
    23. clean(s);
    24. return null;
    25. }
    26. // 7. 把s.next设置成head
    27. if ((h = head) != null && h.next == s)
    28. casHead(h, s.next);
    29. return (E) ((mode == REQUEST) ? m.item : s.item);
    30. }
    31. // 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型
    32. } else if (!isFulfilling(h.mode)) {
    33. // 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点
    34. if (h.isCancelled())
    35. casHead(h, h.next);
    36. // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶
    37. else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
    38. // 11. 使用死循环,直到匹配到对应的节点
    39. for (; ; ) {
    40. // 12. 遍历下个节点
    41. SNode m = s.next;
    42. // 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。
    43. if (m == null) {
    44. casHead(s, null);
    45. s = null;
    46. break;
    47. }
    48. SNode mn = m.next;
    49. // 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。
    50. if (m.tryMatch(s)) {
    51. casHead(s, mn);
    52. return (E) ((mode == REQUEST) ? m.item : s.item);
    53. } else
    54. // 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配
    55. s.casNext(m, mn);
    56. }
    57. }
    58. } else {
    59. // 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,
    60. // 就再执行一遍上面第11for循环中的逻辑(很少概率出现)
    61. SNode m = h.next;
    62. if (m == null)
    63. casHead(h, null);
    64. else {
    65. SNode mn = m.next;
    66. if (m.tryMatch(h))
    67. casHead(h, mn);
    68. else
    69. h.casNext(m, mn);
    70. }
    71. }
    72. }
    73. }
    74. 复制代码

    transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。

    transfer方法中调用了awaitFulfill方法,作用是挂起当前线程。

    1. // 等待被唤醒
    2. SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    3. // 1. 计算超时时间
    4. final long deadline = timed ? System.nanoTime() + nanos : 0L;
    5. Thread w = Thread.currentThread();
    6. // 2. 计算自旋次数
    7. int spins = (shouldSpin(s) ?
    8. (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    9. for (;;) {
    10. if (w.isInterrupted())
    11. s.tryCancel();
    12. // 3. 如果已经匹配到其他节点,直接返回
    13. SNode m = s.match;
    14. if (m != null)
    15. return m;
    16. if (timed) {
    17. // 4. 超时时间递减
    18. nanos = deadline - System.nanoTime();
    19. if (nanos <= 0L) {
    20. s.tryCancel();
    21. continue;
    22. }
    23. }
    24. // 5. 自旋次数减一
    25. if (spins > 0)
    26. spins = shouldSpin(s) ? (spins-1) : 0;
    27. else if (s.waiter == null)
    28. s.waiter = w;
    29. // 6. 开始挂起当前线程
    30. else if (!timed)
    31. LockSupport.park(this);
    32. else if (nanos > spinForTimeoutThreshold)
    33. LockSupport.parkNanos(this, nanos);
    34. }
    35. }
    36. 复制代码

    awaitFulfill方法的逻辑也很简单,就是挂起当前线程。

    take方法底层使用的也是transfer方法:

    1. // 取数据
    2. public E take() throws InterruptedException {
    3. // // 调用转换器实现类,取数据
    4. E e = transferer.transfer(null, false, 0);
    5. if (e != null)
    6. return e;
    7. // 没取到,就中断当前线程
    8. Thread.interrupted();
    9. throw new InterruptedException();
    10. }
    11. 复制代码

    4. 总结

    1. SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。
    2. SynchronousQueue底层是基于栈和队列两种数据结构实现的。
    3. Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。
    4. 如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。
  • 相关阅读:
    WLAN 无线案例(华为AC控制器配置模板)
    svn文件不显示红色感叹号
    基于ARM的字符串拷贝实验(嵌入式系统)
    排序题:数组中的第k个最大元素及出现的次数 - 数组的正态分布排序
    降维(Dimensionality Reduction)
    2023 最新 Git 分布式版本控制系统介绍和下载安装使用教程
    【QandA C++】内存泄漏、进程地址空间、堆和栈、内存对齐、大小端和判断、虚拟内存等重点知识汇总
    科技的成就(三十一)
    基于web在线餐饮网站的设计与实现——仿Coco线上订奶茶饮料6个页面(HTML+CSS+JavaScript)
    C++实现轻量级RPC分布式网络通信框架
  • 原文地址:https://blog.csdn.net/BASK2311/article/details/127933305