
SynchronousQueue是一个不存储元素的阻塞队列,每个put操作必须等待一个take操作,否则不能添加元素,SynchronousQueue队列本身不存储任何元素,适合传递性场景,比如一个线程中的数据传递给另一个线程使用,它的吞吐量比LinkedBlockingQueue和ArrayBlockingQueue更好一些
SynchronousQueue有两种构造方法,一种采用公平模式,一种采用非公平模式,当采用公平模式的时候,等待线程将按FIFO(First Input First Output)顺序竞争,即先进先出的模式
- //参考JDK11源码
- public class SynchronousQueue
extends AbstractQueue - implements BlockingQueue
, java.io.Serializable { -
- private transient volatile Transferer
transferer; -
- abstract static class Transferer
{ - //无论是put 还是take都会调用此方法
- //
- abstract E transfer(E e, boolean timed, long nanos);
- }
-
-
- //1.默认非公平模式
- public SynchronousQueue() {
- this(false);
- }
-
- //根据参数fair构造两种不同的模式
- //当参数为true时,等待线程将按FIFO顺序竞争
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue
() : new TransferStack(); - }
-
-
-
- //...
- }
带参数的构造器,通过true或false,分别构造两种集成Transferer的不同对象。公平模式处理对象TransferQueue和非公平模式处理对象TransferStack都继承对象Transferer,并实现自己的transfer方法
- //参考JDK11源码
- public class SynchronousQueue
extends AbstractQueue - implements BlockingQueue
, java.io.Serializable { -
- private transient volatile Transferer
transferer - public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue
() : new TransferStack(); - }
-
- abstract static class Transferer
{ - //无论是put 还是take都会调用此方法
- //
- abstract E transfer(E e, boolean timed, long nanos);
- }
-
- //...
- }
TransferQueue实现transfer方法的基本算法为循环,尝试采用以下两种方法之一,
如果队列尾巴节点(t) 和本次操作模式不同 从队列头节点(h)开始想后查找匹配,匹配成功后想后推进head,并唤醒匹配节点的waiter线程
- //参考JDK11源码
- public class SynchronousQueue
extends AbstractQueue - implements BlockingQueue
, java.io.Serializable { -
- static final class TransferQueue
extends Transferer { - //头节点
- transient volatile QNode head;
- //尾节点
- transient volatile QNode tail;
- //构造器初始化
- TransferQueue() {
- //构造器中初始化1个虚拟节点,初始化时模式:isData=false
- QNode h = new QNode(null, false);
- head = h;
- tail = h;
- }
-
- //put, take会调用此方法
- E transfer(E e, boolean timed, long nanos) {
-
- QNode s = null; // constructed/reused as needed
- boolean isData = (e != null);
-
- for (;;) {
- QNode t = tail;
- QNode h = head;
- if (t == null || h == null)
- continue;
-
- //1. h和t相同表示队列为空 判断尾几点模式和当前模式是否相同
- if (h == t || t.isData == isData) {
- QNode tn = t.next;
-
- //t和tail不一致的时,不一致读,直接跳入下次循环
- if (t != tail)
- continue;
-
- //tn不为空时,尝试将tn放入队列尾部
- if (tn != null) {
- //advanceTail尝试将tn放入队列尾部
- advanceTail(t, tn);
- continue;
- }
- //非计时操作或者超时返回 null
- if (timed && nanos <= 0L) // can't wait
- return null;
-
- //当前节点e为空的时构建虚拟节点QNode,否则创建e的节点
- if (s == null)
- s = new QNode(e, isData);
-
- if (!t.casNext(null, s)) // failed to link in
- continue;
-
- //将当前节点放入队尾
- advanceTail(t, s);
-
- //等待匹配,并返回匹配节点的item,如果取消等待则返回该节点
- Object x = awaitFulfill(s, e, timed, nanos);
- if (x == s) {
- clean(t, s); //等待被取消,清除s节点
- return null;
- }
-
- if (!s.isOffList()) { // not already unlinked
- advanceHead(t, s); // unlink if head
- if (x != null) // and forget fields
- s.item = s;//item指向自身
- s.waiter = null;
- }
- return (x != null) ? (E)x : e;
-
- } else { // complementary-mode
- //头节点开始向后查找
- QNode m = h.next; // node to fulfill
-
- //t和tail不一致的时,不一致读,直接跳入下次循环
- if (t != tail || m == null || h != head)
- continue; // inconsistent read
-
- Object x = m.item;
- if (isData == (x != null) || // m already fulfilled
- x == m || // m cancelled
- !m.casItem(x, e)) { // lost CAS
- advanceHead(h, m); // dequeue and retry
- continue;
- }
-
- //匹配成功,头节点出列
- advanceHead(h, m);
-
- //唤醒被匹配节点m的线程
- LockSupport.unpark(m.waiter);
- return (x != null) ? (E)x : e;
- }
- }
-
- }
- }
-
-
-
- }
TransferQueue实现transfer方法的基本算法为循环
如果队列空或者已经包含了相同的mode, 分两种情况,如果是非计时操作或者已经超时则返回null;否则把当前节点压栈到队列尾(t),然后等待匹配
- //参考JDK11源码
- public class SynchronousQueue
extends AbstractQueue - implements BlockingQueue
, java.io.Serializable { -
- static final class TransferStack
extends Transferer { - SNode s = null; // constructed/reused as needed
-
- ///根据所传元素判断为生产or消费mode
- int mode = (e == null) ? REQUEST : DATA;
-
- for (;;) {
- SNode h = head;
-
- //为空或者相同的模式
- if (h == null || h.mode == mode) { // empty or same-mode
- if (timed && nanos <= 0L) {
- //head已经被匹配,修改head继续循环
- if (h != null && h.isCancelled())
- casHead(h, h.next); // pop cancelled node
- else
- return null;
- } else if (casHead(h, s = snode(s, e, h, mode))) {//构建新的节点s,放到栈顶
- //等待s节点被匹配,返回s.match节点m
- SNode m = awaitFulfill(s, timed, nanos);
- //s.match==s(等待被取消)
- if (m == s) {
- clean(s);//清除s节点
- return null;
- }
- if ((h = head) != null && h.next == s)
- casHead(h, s.next); // help s's fulfiller
- return (E) ((mode == REQUEST) ? m.item : s.item);
- }
- } else if (!isFulfilling(h.mode)) { // try to fulfill
- if (h.isCancelled())
- //head已经被匹配,修改head继续循环
- casHead(h, h.next);
-
- //构建新节点,放到栈顶
- else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
- for (;;) { // loop until matched or waiters disappear
- SNode m = s.next; // m is s's match
- if (m == null) { // all waiters are gone
- casHead(s, null); // pop fulfill node
- s = null; // use new node next time
- break; // restart main loop
- }
- SNode mn = m.next;
-
- //尝试匹配,唤醒m节点的线程
- if (m.tryMatch(s)) {
- //弹出匹配成功的两个节点
- casHead(s, mn); // pop both s and m
- return (E) ((mode == REQUEST) ? m.item : s.item);
- } else
- //匹配失败,删除m节点,重新循环
- s.casNext(m, mn);
- }
- }
- } else {
- //头节点正在匹配
- SNode m = h.next; // m is h's match
- if (m == null) // waiter is gone
- casHead(h, null); // pop fulfilling node
- else {
- SNode mn = m.next;
- if (m.tryMatch(h)) // help match
- casHead(h, mn); // pop both h and m
- else // lost match
- h.casNext(m, mn); // help unlink
- }
- }
- }
- }
- }
SynchronousQueue 内部没有容量,所以不能通过peek方法获取头部元素,所以插入和移除是1对1对称操作
- /**
- * 此方法始终返回空
- */
- public E peek() {
- return null;
- }
SynchronousQueue 每个put操作必须等待一个take操作,否则不能添加元素,从源码上会发现两个操作都是通过transferer#transfer实现的,
- //将指定的元素添加到此队列
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (transferer.transfer(e, false, 0) == null) {
- Thread.interrupted();
- throw new InterruptedException();
- }
- }
-
- //检索并删除此队列的头
- public E take() throws InterruptedException {
- E e = transferer.transfer(null, false, 0);
- if (e != null)
- return e;
- Thread.interrupted();
- throw new InterruptedException();
- }