还没有过第一篇文章的小伙伴可以先去看看第一篇引子:SynchronousQueue源码分析_第一讲:引子
非公平模式 内部实现的结构是栈,就是里面的内部类TransferStack实现的,所以这一篇文章就来看一看这个内部类究竟做了什么。
public class TransferStackDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(2000);
Thread thread3 = new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
}
}
执行结果:
2
此时程序不会结束,会一直执行,因为栈中还有一个元素1
SynchronousQueue 有两个构造方法 ,一个有参,一个无参,但其实都是在调用有参的构造
默认调用 TransferStack,使用非公平模式
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue 中有三个非常重要的内部类:Transferer,TransferQueue,TransferStack
其中TransferQueue和 TransferStack都继承了Transferer类,Transferer 内部类是最重要的内部类,上一篇文章(引子)中提到过,可以去看上一篇文章,连接放这里了:【第一讲:引子】
/** 表示Node类型为请求类型 */
static final int REQUEST = 0;
/** Node represents an unfulfilled producer
* 表示Node类型为 数据类型 */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST
* 表示Node类型为 匹配中类型
* 假设栈顶元素为Request-Node,当前请求类型为 Data 的话,入栈后会修改类型为 Fulfilling 【栈顶(就是当前请求) & 栈顶之下的第一个node】
* 假设栈顶元素为 Data-Node,当前请求类型为 Request 的话,入栈后会修改类型为 Fulfilling 【栈顶(就是当前请求) & 栈顶之下的一个node】
* */
static final int FULFILLING = 2;
//表示栈顶指针
volatile SNode head;
这是TransferStack中的一个内部类
static final class SNode {
//指向下一额栈帧
volatile SNode next; // next node in stack
//与当前node匹配的节点
volatile SNode match; // the node matched to this
//假设当前node 对应的线程 自旋期间未被匹配成功,那么node对应的线程需要挂起,挂起前 waiter 保存当前线程的引用
//方便 匹配成功后,被唤醒
volatile Thread waiter; // to control park/unpark
//数据域:data不为空,表示当前node对应的请求类型为data类型,反之则表示当前Node 为request类型
Object item; // data; or null for REQUESTs
//表示当前Node的模式 [Data/Request/Fulfilling]
int mode;
SNode(Object item) {
this.item = item;
}
//CAS方式设置 Node对象的next字段
boolean casNext(SNode cmp, SNode val) {
//优化:cmp==next 为什么要判断?
//因为cas指令 在平台执行时,同一时刻只能有一个cas指令被执行
//有了 java层面的这一个判断,可以提升一部分性能,cmp == next 不相等,就没必要走 cas指令
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* @param s the node to match
* @return true if successfully matched to s
*
* 尝试匹配
* 调用tryMatch 的对象是栈顶节点的下一个节点,这个下一个节点会与栈顶节点匹配
*
* @return true 匹配成功,否则匹配失败
*/
boolean tryMatch(SNode s) {
//条件一:match == null 成立:说明当前Node尚未与任何节点发生过匹配
//条件二:成立:使用CAS方式,设置match字段,表示当前Node 已经被匹配了
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
//当前Node 如果自旋结束,那么会使用LockSupport.park 方法挂起,挂起之前 会将Node对应的Thread 保留到waiter字段
Thread w = waiter;
//条件成立:说明Node对应的Thread已经挂起了。。。
if (w != null) { // waiters need at most one unpark
waiter = null;
//使用unpark唤醒
LockSupport.unpark(w);
}
//匹配成功
return true;
}
return match == s;
}
/**
* 尝试取消..
*/
void tryCancel() {
//match 字段 保留当前Node对象本身,表示这个Node是取消状态,取消状态的Node,最终会被强制 移除出栈
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
//如果match保留的是当前Node本身,那表示Node 是取消状态,反之 则 非取消状态
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
判断是否是Fulfilling 状态。
Fulfilling 状态就是当栈中有一个请求A,又来一个请求B入栈,请求A和请求B互补,可以配对,那么请求B会更改为Fulfilling 状态。
//判断当前模式是否为 匹配中的状态
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
设置栈顶元素,在向栈中添加元素或者出栈时就会调用。
通过CAS的方式修改,第一个判断:h == head 可以先确定当前h是不是栈顶元素,如果不是就直接返回false,不用再执行下面的CAS,提高了代码执行的效率
//设置栈顶元素
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
给当前线程创建一个SNode 对象,并指向下一个SNode 对象
/**
* @param s SNode引用 ,当这个引用指向空时,snode方法会创建一个SNode对象,并赋值给这个引用
* @param e SNode对象的item字段
* @param next 指向当前栈帧的下一个栈帧
* @param mode Request/Data/Fulfilling 状态
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
transfer 是最主要的方法,我们的put和take(放元素和取元素)都是调用这个方法来执行的
这里面一个有三个if else:
/**
* Puts or takes an item.
*
* 这个方法是在 父类 Transfer 中定义的抽象方法,这里实现了这个方法
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
//包装当前线程的Node
SNode s = null; // constructed/reused as needed
//e==null 条件成立:当前线程是一个Request线程
//否则 e!=null 说明 当前线程是一个Data线程,提交数据的线程
int mode = (e == null) ? REQUEST : DATA;
//自旋
for (;;) {
//h 表示 栈顶指针
SNode h = head;
//Case1:当前栈内为空,或者栈顶Node模式与当前请求模式一致,都是需要做入栈操作
if (h == null || h.mode == mode) { // empty or same-mode
//条件一:成立,说明当前请求是指定了 超时限制的
//条件二:nanos <= 0,nanos == 0 ,表示这个请求不支持“阻塞等待”,queue.offer()
if (timed && nanos <= 0) { // can't wait
//条件成立:说明栈顶已经取消状态了,协助栈顶出栈
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
//大部分状态从这里返回
return null;
//什么时候会执行到else if?
//当前栈顶为空 或者 模式与当前请求一致,且当前请求允许阻塞等待
//casHead(h, s = snode(s, e, h, mode)) 入栈操作
} else if (casHead(h, s = snode(s, e, h, mode))) {
//执行到这里,说明当前请求入栈成功
//入栈成功之后会做什么呢?
//在栈内等待一个好消息,等待被匹配!
//awaitFulfill(s, timed, nanos) 等待被匹配逻辑
//1.正常情况,返回匹配的逻辑的节点
//2.取消状态:返回当前节点 s节点进去,返回s节点
SNode m = awaitFulfill(s, timed, nanos);
//条件成立:说明当前Node状态是取消状态
if (m == s) { // wait was cancelled
//将取消状态的节点 出栈
clean(s);
//取消状态 最终返回null
return null;
}
//执行到这里说明当前Node 已经被匹配了
//条件成立:条件一:说明栈顶是有Node
//条件二:说明Fulfill 和 当前Node 还未出栈,需要协助出栈
if ((h = head) != null && h.next == s)
//将fulfill 和 当前Node 结对出栈
casHead(h, s.next); // help s's fulfiller
//假设当前Node模式为Request类型,返回匹配节点的m.item 数据域
//当前Node模式为Data模式,返回Node.item 数据域,当前请求提交的数据 e
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//什么时候来到这里?
//栈顶Node的模式与当前请求的模式不一致,会执行else if的条件
//1.(栈顶,当前请求):(Data,Request),(Request,Data),(Fulfilling,Request/Data)
//CASE2:当前栈顶模式与请求模式不一致,且栈顶不是Fulfilling
} else if (!isFulfilling(h.mode)) { // try to fulfill
//条件成立:说明当前栈顶状态为 取消状态,当前线程协助它出栈
if (h.isCancelled()) // already cancelled
//协助 取消状态节点 出栈
casHead(h, h.next); // pop and retry
//条件成立:说明压栈节点成功,入栈一个 Fulfilling | mode Node
//比如说,现在栈顶是Data Node,要入栈的是RequestNode,先把入栈节点设置为 (Fulfilling | RequestNode),然后再入栈
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
//当前请求 入栈成功
//自旋,Fulfilling节点 和 Fulfilling.next 节点进行匹配工作
for (;;) { // loop until matched or waiters disappear
//m 与当前s 匹配节点
SNode m = s.next; // m is s's match
//m==null 什么时候会成立?
//当s.next节点 超时或者被外部线程中断唤醒后,会执行clean 操作将自己清理出栈
//此时,站在匹配线程来看,真有可能拿到一个null
//栈中一开始有一个Data,又来个Request,入栈变为Fulfilling,结果 Data超时或者被外部线程中断唤醒后会被清楚栈
//现在栈中只有一个Fulfilling,它不和任何节点匹配,所以直接出栈
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
//回到外层大的自旋中。再重新选择路径执行,此时有可能插入一个节点
break; // restart main loop
}
//什么时候会执行到这里呢?
//Fulfilling 匹配的节点不为空,进行真正的匹配工作
//获取匹配节点的下一个节点
//s是Fulfilling节点,在栈顶,m是匹配节点在栈顶下面,mn是m下面的节点。
SNode mn = m.next;
if (m.tryMatch(s)) {
//结对出栈
casHead(s, mn); // pop both s and m
//假设当前Node模式为Request类型,返回匹配节点的m.item 数据域
//当前Node模式为Data模式,返回Node.item 数据域,当前请求提交的数据 e
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
//执行到这里说明m.tryMatch(s)失败,什么时候会失败呢?
//当Fulfilling节点下面的节点 m 中断取消后,它的match会设置为自己,而tryMatch里面的match期望值是null,所以会cas失败,所以会走到这里
//s,Fulfilling节点的next设置为 m的下面的节点 mn
//强制出栈
s.casNext(m, mn); // help unlink
}
}
//CASE3:怎么时候会执行到这里?
//栈顶模式为Fulfilling模式,表示栈顶和栈顶下面的栈帧正在发生匹配
//当前请求需要协助 工作
} else { // help a fulfiller
//h 表示的是Fulfilling节点,m是 Fulfilling匹配的节点(Fulfilling下面的节点)
SNode m = h.next; // m is h's match
//m==null 什么时候会成立?
//当s.next节点 超时或者被外部线程中断唤醒后,会执行clean 操作将自己清理出栈
//此时,站在匹配线程来看,真有可能拿到一个null
//栈中一开始有一个Data,又来个Request,入栈变为Fulfilling,结果 Data超时或者被外部线程中断唤醒后会被清楚栈
//现在栈中只有一个Fulfilling,它不和任何节点匹配,所以直接出栈
if (m == null) // waiter is gone
//将栈清空
casHead(h, null); // pop fulfilling node
//大部分情况走else
else {
//获取栈顶匹配节点的下一个节点
SNode mn = m.next;
//条件成立:说明 m 和 栈顶 匹配成功
if (m.tryMatch(h)) // help match
//双双出栈,让栈顶指针指向匹配节点的下一个节点
casHead(h, mn); // pop both h and m
else // lost match
//m.tryMatch(h) 失败,说明m是取消状态,match指向自己而不是null
//先要把h,Fulfilling的next 指向m的下一个节点mn,让m出栈
//强制出栈
h.casNext(m, mn); // help unlink
}
}
}
}
awaitFulfill(s, timed, nanos)方法的作用:节点s和它下面的节点进行匹配
/**
* @param s 当前请求Node
* @param timed 当前请求是否支持 超时限制
* @param nanos 如果请求支持超时限制,nanos 表示超时等待时长
* @return matched
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//等待的截止时间,timed==true =>System.nanoTime() + nanos 当前时间+超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//获取当前请求线程、、
Thread w = Thread.currentThread();
// spins 表示当前请求线程在 下面的for自旋检查中,自旋次数,如果达到spins自旋次数时,
//当前线程对应的Node 仍然未被匹配成功,那么再选择挂起当前请求线程
int spins = (shouldSpin(s) ?
//timed==true 指定了超时限制的,这个时候采用 maxTimedSpins==32 ,否则采用32*16
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋检查逻辑:1、是否匹配 2、是否超时 3、是否被中断
for (;;) {
//条件成立:说明当前线程收到中断信号,需要设置Node状态为 cancel状态
if (w.isInterrupted())
//Node对象的match指向Node 说明该Node状态就是 取消状态
s.tryCancel();
//m 表示与当前Node匹配的节点
//1.正常状态:有一个请求 与 当前Node匹配成功,这个时候s.match 指向匹配节点
//2。取消情况:当前match 指向当前Node
SNode m = s.match;
if (m != null)
//可能正常也可能是取消
return m;
//条件成立:说明指定了超时限制
if (timed) {
//表示距离超时,还有多少纳秒
nanos = deadline - System.nanoTime();
//条件成立:说明已经超时了
if (nanos <= 0L) {
//设置当前Node状态 为取消状态。。match-》当前Node
s.tryCancel();
continue;
}
}
//条件成立:说明当前线程还可以进行自旋检查
if (spins > 0)
//自旋次数 累计 递减
spins = shouldSpin(s) ? (spins-1) : 0;
//spins==0,已经不允许再进行自旋检查了
else if (s.waiter == null)
//把当前Node对应的Thread 保存到 Node.waiter 字段中
s.waiter = w; // establish waiter so can park next iter
//条件成立:说明当前Node对应的请求 未指定超时限制
else if (!timed)
//使用不指定超时限制的park方法 挂起当前线程,知道 当前线程被外部线程 使用unpark唤醒
LockSupport.park(this);
//说明时候执行到这里? timed==true 设置了 超时时间
//条件成立:nanos>1000 纳秒的值,只有这种情况下,才允 许挂起
//否则说明超时时间 设置的太小,挂起和唤醒的成本太高,高于空转自旋
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
这个方法用来清除取消节点
/**
* 清除取消节点
*/
void clean(SNode s) {
//清空数据域
s.item = null; // forget item
//释放线程引用
s.waiter = null; // forget thread
//检查取消节点的截止位置 看看取消节点下面是不是取消节点
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
//当前虚幻检查节点
SNode p;
//从栈顶向下检查,将栈顶开始向下连续的 取消状态的节点 全部清理出去,知道碰到past为止
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
//可能会跳出循环的一种情况:p还没有到达past节点,但是遇到了一个节点不是取消状态,就会跳出循环
// Unsplice embedded nodes
while (p != null && p != past) {
//获取p.next p是非取消节点
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
判断是否是栈顶节点或者是Fulfilling节点
如果是栈顶节点或者是Fulfilling节点,返回true
boolean shouldSpin(SNode s) {
//获取栈顶
SNode h = head;
//条件一:h==s,成立:当前s 就是栈顶,允许自旋检查..
//条件二:h == null:什么时候成立?当前s节点 自旋检查期间,又来了一个与当前s节点匹配的请求,又双双出栈了,条件会成立。
//条件三:isFulfilling:当前s 不是栈顶元素,并且当前栈顶正在匹配,这种状态 栈顶下面的元素,都允许自旋检查
return (h == s || h == null || isFulfilling(h.mode));
}