/** Number of CPUS, to place bound on table size */
// 运行环境的实际 CPU 核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
//单元格数组,非空状态时,长度是 2 的幂
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
// 基础值,通过 cas 更新
// 主要在没有线程竞争的场景下使用
// 也用于 cells 初始化时,作为降级使用
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
// 扩容、创建单元格时的自旋锁
transient volatile int cellsBusy;
// Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of packaging restrictions.
//
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
内部类 cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell[],这相当于把靶值拆碎,即这个数组的和才是最终值LongAdder 源码add()
用于实现 value + x
下面是源码
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
下面是整体思路和部分细节
当线程压力不大时,cells 里没有值,此时的自增是 对 base 进行 CAS 操作
if ((as = cells) != null || !casBase(b = base, b + x))
如果操作成功,就不需要进入内层逻辑
当线程开始争抢对 base 的修改权时,对 base 的 CAS 操作会失败
这就进入了上面的 if 内部
当 LongAdder 开始使用 cells 后,base 就约等于弃用了
说约等于是因为最后求和时还要计算 base 的值,同时作为初始化和扩容时抢占 cellBusy 失败的降级方案
cells 数组相当于是线程槽,线程会通过计算对对应的槽位进行操作
线程通过 getProbe() 方法获取线程探针 hash 值,然后将此值与 (数组长度 - 1) 进行按位与运算 as[getProbe() & m]
达到线程 hash 对cells 的 size 取余的效果,并用结果作为此线程在 cells 数组上的落点,道理类似于 hashMap.get()
当线程需要对 cells 进行操作的时候,会涉及到是否争抢问题
cells 的 size 足够cells 进行扩容boolean uncontended = true; ,刚刚声明出来时认为 无争抢Cell 做 CAS 操作a.cas(v = a.value, v + x) 实现对 cells 上某个 Cell 的 + xStriped64.longAccumulate() 处理cells 后,下面的情况都需要 Striped64.longAccumulate() 处理
cells:cells 为 null,或长度 < 1as == null、(m = as.length - 1) < 0cells 上计算的落点上没有数据,即没有初始化对应的 Cella = as[getProbe() & m]) == nullcells 的 size 取余计算在 cells 上的落点,类似 hashMap.get()Striped64 源码参数
Striped64.longAccumulate() 方法的入参如下
long x 增加值LongBinaryOperator fn 操作函数,用于实现非 原值 + x 的功能boolean wasUncontended 意味着当前线程 CAS 操作时发生了争抢整体脉络

如上图,此方法整体由两个部分组成
for(;;) 组织的自旋(1-3)probe (0)自旋内部使用 if-else 区分了三类场景
calls 已经存在且不为空,即 calls 已完成初始化的场景calls 不存在但成功的给 cellsBusy 上自旋锁calls 进行初始化+ x 的计算任务保证线程 hash 的获取

calls 已经初始化

如果 cells 数组已经初始化完成,则线程进入此方法可能是需要
calls 进行扩容Cell 进行 VAS关于 h = advanceProbe(h)
如果 cells 数组已经初始化完成,则进入此方法的线程每次自旋都需要通过此方法进行 rehash
这是因为线程的 Probe,即 hash 只是用于计算线程在 calls 上的落点
并且,只要线程的 + x 计算任务可以执行,那么无论执行在哪个 Cell 上都是无所谓的
rehash 可以使线程更加均匀的使用 cells
也可是使线程与另一线程在某 Cell 上发生碰撞而导致自旋的下一轮,避开这个碰撞的槽位
已初始化 cells 的场景下,各个 case 按判断优先级排列如下
初始化 Cell,calls 初始化了,但可能其中具体槽位还没初始化
如果是 Cell 没有初始化导致的进入此方法,优先尝试初始化对应的 Cell
但不保证一定能完成初始化,因为这个初始化任务也需要抢占 cellBusy 标记
同下面calls未初始化场景一样,初始化 Cell 需要 双重检查
重置刚刚进入此方法的,外面 CAS 某 Cell 失败的线程的争抢标记位 wasUncontended
如果是因为争抢 Call 失败进入的此方法,在这里将标记清除
这是因为刚刚就是当前线程对当前落点争抢失败进来的,马上再重试着争抢一次意义不大
而清除标记后,可以快速跳过次轮自旋并进行一次 rehash,并在下一轮自旋中重新计算新落点并尝试争抢
对已经初始化的 Cell 进行 CAS 操作,以完成 + x 计算任务
因为在之前清理了争抢标记位 wasUncontended,同时每轮自旋都会 rehash
所以此时再尝试对某 Cell 操作是有必要的,毕竟很可能换了一个 Cell
如果 + x 计算任务完成了就跳出,不需要继续自旋了
标记 cells 的 扩容意向, 禁用扩容 优先于 需要扩容
rehash 的线程再次争抢冲突,理论上是需要扩容的,因此在本轮自旋中标记它为 需要扩容cells 为 需要扩容 (collide = true) 后,会在下一轮自旋中完成扩容cells 的长度已经超过 JVM 的可用核数就 禁用扩容else if 结构,即走了此分支就不会执行扩容逻辑分支了执行 cells 的 扩容
cellBusy 标记Cell[] rs = new Cell[n << 1]HashMap 扩容HashMap 扩容时一样 rehash,因为只要线程的 + x 任务完成了,在哪个 Cell 上完成都无所谓calls未初始化场景一样,扩容时需要 双重检查calls未初始化

流程如下
cellBusy 标记才能开始初始化inittrue 就说明没有完成初始化,继续自旋,否则就跳出自旋cells == as,能走到此处逻辑意味着 as == nullcells == as 即 calls == null 时对 cellBusy 上锁cells 进行两次初始化cells
Cell[]as[getProbe() & m]) 的思路,但因为刚初始化长度固定是 2,因此一定是 & 1+ x 操作cells 指向当前新建的 Cell[]cellBusy 标记降级使用 base 计算

Striped64.longAccumulate() 的主要作用是尝试初始化或扩容 cells ,这个过程是自旋的cells 的初始化或扩容过程中的需要对 cellBusy 上锁cells 启用后 base 成为一个摆设