/** Number of CPUS, to place bound on table size */
// 运行环境的实际 CPU 核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
//单元格数组,非空状态时,长度是 2 的幂
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
// 基础值,通过 cas 更新
// 主要在没有线程竞争的场景下使用
// 也用于 cells 初始化时,作为降级使用
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
// 扩容、创建单元格时的自旋锁
transient volatile int cellsBusy;
// Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of packaging restrictions.
//
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
内部类 cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell[]
,这相当于把靶值拆碎,即这个数组的和才是最终值LongAdder
源码add()
用于实现 value + x
下面是源码
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
下面是整体思路和部分细节
当线程压力不大时,cells
里没有值,此时的自增是 对 base 进行 CAS 操作
if ((as = cells) != null || !casBase(b = base, b + x))
如果操作成功,就不需要进入内层逻辑
当线程开始争抢对 base 的修改权时,对 base 的 CAS 操作会失败
这就进入了上面的 if 内部
当 LongAdder
开始使用 cells
后,base 就约等于弃用了
说约等于是因为最后求和时还要计算 base 的值,同时作为初始化和扩容时抢占 cellBusy
失败的降级方案
cells
数组相当于是线程槽,线程会通过计算对对应的槽位进行操作
线程通过 getProbe()
方法获取线程探针 hash 值,然后将此值与 (数组长度 - 1) 进行按位与运算 as[getProbe() & m]
达到线程 hash 对cells
的 size 取余的效果,并用结果作为此线程在 cells
数组上的落点,道理类似于 hashMap.get()
当线程需要对 cells
进行操作的时候,会涉及到是否争抢问题
cells
的 size 足够cells
进行扩容boolean uncontended = true;
,刚刚声明出来时认为 无争抢Cell
做 CAS 操作a.cas(v = a.value, v + x)
实现对 cells
上某个 Cell
的 + x
Striped64.longAccumulate()
处理cells
后,下面的情况都需要 Striped64.longAccumulate()
处理
cells
:cells
为 null,或长度 < 1as == null
、(m = as.length - 1) < 0
cells
上计算的落点上没有数据,即没有初始化对应的 Cell
a = as[getProbe() & m]) == null
cells
的 size 取余计算在 cells
上的落点,类似 hashMap.get()
Striped64
源码参数
Striped64.longAccumulate()
方法的入参如下
long x
增加值LongBinaryOperator fn
操作函数,用于实现非 原值 + x
的功能boolean wasUncontended
意味着当前线程 CAS 操作时发生了争抢整体脉络
如上图,此方法整体由两个部分组成
for(;;)
组织的自旋(1-3)probe
(0)自旋内部使用 if-else
区分了三类场景
calls
已经存在且不为空,即 calls
已完成初始化的场景calls
不存在但成功的给 cellsBusy
上自旋锁calls
进行初始化+ x
的计算任务保证线程 hash 的获取
calls
已经初始化
如果 cells
数组已经初始化完成,则线程进入此方法可能是需要
calls
进行扩容Cell
进行 VAS关于 h = advanceProbe(h)
如果 cells
数组已经初始化完成,则进入此方法的线程每次自旋都需要通过此方法进行 rehash
这是因为线程的 Probe,即 hash 只是用于计算线程在 calls
上的落点
并且,只要线程的 + x
计算任务可以执行,那么无论执行在哪个 Cell
上都是无所谓的
rehash
可以使线程更加均匀的使用 cells
也可是使线程与另一线程在某 Cell
上发生碰撞而导致自旋的下一轮,避开这个碰撞的槽位
已初始化 cells
的场景下,各个 case 按判断优先级排列如下
初始化 Cell
,calls
初始化了,但可能其中具体槽位还没初始化
如果是 Cell
没有初始化导致的进入此方法,优先尝试初始化对应的 Cell
但不保证一定能完成初始化,因为这个初始化任务也需要抢占 cellBusy
标记
同下面calls
未初始化场景一样,初始化 Cell
需要 双重检查
重置刚刚进入此方法的,外面 CAS 某 Cell
失败的线程的争抢标记位 wasUncontended
如果是因为争抢 Call
失败进入的此方法,在这里将标记清除
这是因为刚刚就是当前线程对当前落点争抢失败进来的,马上再重试着争抢一次意义不大
而清除标记后,可以快速跳过次轮自旋并进行一次 rehash
,并在下一轮自旋中重新计算新落点并尝试争抢
对已经初始化的 Cell
进行 CAS 操作,以完成 + x
计算任务
因为在之前清理了争抢标记位 wasUncontended
,同时每轮自旋都会 rehash
所以此时再尝试对某 Cell
操作是有必要的,毕竟很可能换了一个 Cell
如果 + x
计算任务完成了就跳出,不需要继续自旋了
标记 cells
的 扩容意向, 禁用扩容 优先于 需要扩容
rehash
的线程再次争抢冲突,理论上是需要扩容的,因此在本轮自旋中标记它为 需要扩容cells
为 需要扩容 (collide = true
) 后,会在下一轮自旋中完成扩容cells
的长度已经超过 JVM 的可用核数就 禁用扩容else if
结构,即走了此分支就不会执行扩容逻辑分支了执行 cells
的 扩容
cellBusy
标记Cell[] rs = new Cell[n << 1]
HashMap
扩容HashMap
扩容时一样 rehash
,因为只要线程的 + x
任务完成了,在哪个 Cell
上完成都无所谓calls
未初始化场景一样,扩容时需要 双重检查calls
未初始化
流程如下
cellBusy
标记才能开始初始化init
true
就说明没有完成初始化,继续自旋,否则就跳出自旋cells == as
,能走到此处逻辑意味着 as == nullcells == as
即 calls == null
时对 cellBusy
上锁cells
进行两次初始化cells
Cell[]
as[getProbe() & m])
的思路,但因为刚初始化长度固定是 2,因此一定是 & 1
+ x
操作cells
指向当前新建的 Cell[]
cellBusy
标记降级使用 base 计算
Striped64.longAccumulate()
的主要作用是尝试初始化或扩容 cells
,这个过程是自旋的cells
的初始化或扩容过程中的需要对 cellBusy
上锁cells
启用后 base 成为一个摆设