• JAVA多线程同步队列SynchronousQueue



           SynchronousQueue是一个不存储元素的阻塞队列,每个put操作必须等待一个take操作,否则不能添加元素,SynchronousQueue队列本身不存储任何元素,适合传递性场景,比如一个线程中的数据传递给另一个线程使用,它的吞吐量比LinkedBlockingQueue和ArrayBlockingQueue更好一些


    1.SynchronousQueue构造方法


    SynchronousQueue有两种构造方法,一种采用公平模式,一种采用非公平模式,当采用公平模式的时候,等待线程将按FIFO(First Input First Output)顺序竞争,即先进先出的模式

    1. //参考JDK11源码
    2. public class SynchronousQueue extends AbstractQueue
    3. implements BlockingQueue, java.io.Serializable {
    4. private transient volatile Transferer transferer;
    5. abstract static class Transferer {
    6. //无论是put 还是take都会调用此方法
    7. //
    8. abstract E transfer(E e, boolean timed, long nanos);
    9. }
    10. //1.默认非公平模式
    11. public SynchronousQueue() {
    12. this(false);
    13. }
    14. //根据参数fair构造两种不同的模式
    15. //当参数为true时,等待线程将按FIFO顺序竞争
    16. public SynchronousQueue(boolean fair) {
    17. transferer = fair ? new TransferQueue() : new TransferStack();
    18. }
    19. //...
    20. }

    带参数的构造器,通过true或false,分别构造两种集成Transferer的不同对象。公平模式处理对象TransferQueue和非公平模式处理对象TransferStack都继承对象Transferer,并实现自己的transfer方法

    1. //参考JDK11源码
    2. public class SynchronousQueue extends AbstractQueue
    3. implements BlockingQueue, java.io.Serializable {
    4. private transient volatile Transferer transferer
    5. public SynchronousQueue(boolean fair) {
    6. transferer = fair ? new TransferQueue() : new TransferStack();
    7. }
    8. abstract static class Transferer {
    9. //无论是put 还是take都会调用此方法
    10. //
    11. abstract E transfer(E e, boolean timed, long nanos);
    12. }
    13. //...
    14. }

    1.1公平方式TransferQueue

     TransferQueue实现transfer方法的基本算法为循环,尝试采用以下两种方法之一,

    • 如果队列为空或是队列尾节点(t)和本次操作模式相同(模式是只QNode#isData),尝试将节点添加到等待者队列尾部,调用awaitFulfill等待节点匹配, 匹配成功返回匹配节点item,
    • 如果队列尾巴节点(t) 和本次操作模式不同 从队列头节点(h)开始想后查找匹配,匹配成功后想后推进head,并唤醒匹配节点的waiter线程

    1. //参考JDK11源码
    2. public class SynchronousQueue extends AbstractQueue
    3. implements BlockingQueue, java.io.Serializable {
    4. static final class TransferQueue extends Transferer {
    5. //头节点
    6. transient volatile QNode head;
    7. //尾节点
    8. transient volatile QNode tail;
    9. //构造器初始化
    10. TransferQueue() {
    11. //构造器中初始化1个虚拟节点,初始化时模式:isData=false
    12. QNode h = new QNode(null, false);
    13. head = h;
    14. tail = h;
    15. }
    16. //put, take会调用此方法
    17. E transfer(E e, boolean timed, long nanos) {
    18. QNode s = null; // constructed/reused as needed
    19. boolean isData = (e != null);
    20. for (;;) {
    21. QNode t = tail;
    22. QNode h = head;
    23. if (t == null || h == null)
    24. continue;
    25. //1. h和t相同表示队列为空 判断尾几点模式和当前模式是否相同
    26. if (h == t || t.isData == isData) {
    27. QNode tn = t.next;
    28. //t和tail不一致的时,不一致读,直接跳入下次循环
    29. if (t != tail)
    30. continue;
    31. //tn不为空时,尝试将tn放入队列尾部
    32. if (tn != null) {
    33. //advanceTail尝试将tn放入队列尾部
    34. advanceTail(t, tn);
    35. continue;
    36. }
    37. //非计时操作或者超时返回 null
    38. if (timed && nanos <= 0L) // can't wait
    39. return null;
    40. //当前节点e为空的时构建虚拟节点QNode,否则创建e的节点
    41. if (s == null)
    42. s = new QNode(e, isData);
    43. if (!t.casNext(null, s)) // failed to link in
    44. continue;
    45. //将当前节点放入队尾
    46. advanceTail(t, s);
    47. //等待匹配,并返回匹配节点的item,如果取消等待则返回该节点
    48. Object x = awaitFulfill(s, e, timed, nanos);
    49. if (x == s) {
    50. clean(t, s); //等待被取消,清除s节点
    51. return null;
    52. }
    53. if (!s.isOffList()) { // not already unlinked
    54. advanceHead(t, s); // unlink if head
    55. if (x != null) // and forget fields
    56. s.item = s;//item指向自身
    57. s.waiter = null;
    58. }
    59. return (x != null) ? (E)x : e;
    60. } else { // complementary-mode
    61. //头节点开始向后查找
    62. QNode m = h.next; // node to fulfill
    63. //t和tail不一致的时,不一致读,直接跳入下次循环
    64. if (t != tail || m == null || h != head)
    65. continue; // inconsistent read
    66. Object x = m.item;
    67. if (isData == (x != null) || // m already fulfilled
    68. x == m || // m cancelled
    69. !m.casItem(x, e)) { // lost CAS
    70. advanceHead(h, m); // dequeue and retry
    71. continue;
    72. }
    73. //匹配成功,头节点出列
    74. advanceHead(h, m);
    75. //唤醒被匹配节点m的线程
    76. LockSupport.unpark(m.waiter);
    77. return (x != null) ? (E)x : e;
    78. }
    79. }
    80. }
    81. }
    82. }

    1.2非公平方式TransferStack

    TransferQueue实现transfer方法的基本算法为循环

    • 如果队列空或者已经包含了相同的mode, 分两种情况,如果是非计时操作或者已经超时则返回null;否则把当前节点压栈到队列尾(t),然后等待匹配

    • 通过isFulfilling方法判断队列头(h)是否还没有匹配,如果没有匹配, 把当前节点压入队列头,并尝试匹配, 匹配成功后从栈中弹处2个节点, 并返回匹配节点的数据
    • 如果栈顶已经持有另一个节点,说明栈顶节点正在匹配, 则帮助此节点进行匹配操作,然后继续循环
    1. //参考JDK11源码
    2. public class SynchronousQueue extends AbstractQueue
    3. implements BlockingQueue, java.io.Serializable {
    4. static final class TransferStack extends Transferer {
    5. SNode s = null; // constructed/reused as needed
    6. ///根据所传元素判断为生产or消费mode
    7. int mode = (e == null) ? REQUEST : DATA;
    8. for (;;) {
    9. SNode h = head;
    10. //为空或者相同的模式
    11. if (h == null || h.mode == mode) { // empty or same-mode
    12. if (timed && nanos <= 0L) {
    13. //head已经被匹配,修改head继续循环
    14. if (h != null && h.isCancelled())
    15. casHead(h, h.next); // pop cancelled node
    16. else
    17. return null;
    18. } else if (casHead(h, s = snode(s, e, h, mode))) {//构建新的节点s,放到栈顶
    19. //等待s节点被匹配,返回s.match节点m
    20. SNode m = awaitFulfill(s, timed, nanos);
    21. //s.match==s(等待被取消)
    22. if (m == s) {
    23. clean(s);//清除s节点
    24. return null;
    25. }
    26. if ((h = head) != null && h.next == s)
    27. casHead(h, s.next); // help s's fulfiller
    28. return (E) ((mode == REQUEST) ? m.item : s.item);
    29. }
    30. } else if (!isFulfilling(h.mode)) { // try to fulfill
    31. if (h.isCancelled())
    32. //head已经被匹配,修改head继续循环
    33. casHead(h, h.next);
    34. //构建新节点,放到栈顶
    35. else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
    36. for (;;) { // loop until matched or waiters disappear
    37. SNode m = s.next; // m is s's match
    38. if (m == null) { // all waiters are gone
    39. casHead(s, null); // pop fulfill node
    40. s = null; // use new node next time
    41. break; // restart main loop
    42. }
    43. SNode mn = m.next;
    44. //尝试匹配,唤醒m节点的线程
    45. if (m.tryMatch(s)) {
    46. //弹出匹配成功的两个节点
    47. casHead(s, mn); // pop both s and m
    48. return (E) ((mode == REQUEST) ? m.item : s.item);
    49. } else
    50. //匹配失败,删除m节点,重新循环
    51. s.casNext(m, mn);
    52. }
    53. }
    54. } else {
    55. //头节点正在匹配
    56. SNode m = h.next; // m is h's match
    57. if (m == null) // waiter is gone
    58. casHead(h, null); // pop fulfilling node
    59. else {
    60. SNode mn = m.next;
    61. if (m.tryMatch(h)) // help match
    62. casHead(h, mn); // pop both h and m
    63. else // lost match
    64. h.casNext(m, mn); // help unlink
    65. }
    66. }
    67. }
    68. }
    69. }


    2.SynchronousQueue常用方法


    SynchronousQueue 内部没有容量,所以不能通过peek方法获取头部元素,所以插入和移除是1对1对称操作

    1. /**
    2. * 此方法始终返回空
    3. */
    4. public E peek() {
    5. return null;
    6. }

    SynchronousQueue 每个put操作必须等待一个take操作,否则不能添加元素,从源码上会发现两个操作都是通过transferer#transfer实现的,

    1. //将指定的元素添加到此队列
    2. public void put(E e) throws InterruptedException {
    3. if (e == null) throw new NullPointerException();
    4. if (transferer.transfer(e, false, 0) == null) {
    5. Thread.interrupted();
    6. throw new InterruptedException();
    7. }
    8. }
    9. //检索并删除此队列的头
    10. public E take() throws InterruptedException {
    11. E e = transferer.transfer(null, false, 0);
    12. if (e != null)
    13. return e;
    14. Thread.interrupted();
    15. throw new InterruptedException();
    16. }


    上一篇:JAVA多线程信号量Semaphore

  • 相关阅读:
    初学者都能学会的ElasticSearch入门实战
    『C语言进阶』字符函数和内存函数(1)
    第04章 Tableau高级操作
    实战演练 | Navicat 安全可靠的数据传输功能
    【MySQL教程】| (1-1) 2023MySQL-8.1.0 安装教程
    C#落选,Python“连庄”年度编程语言,TIOBE 1月编程语言排行榜出炉
    python爬虫企业微信打卡数据,写入数据库
    Dockerfil 构建上下文 build -f 选项 加快构建速度
    如何拉取钉钉的外出、出差审批单
    Win10下pytorch环境搭建详细教程以及示例测试(二)
  • 原文地址:https://blog.csdn.net/Beijing_L/article/details/126128857