LongAdder是JDK1.8由Doug Lea大神新增的原子操作类,位于java.util.concurrent.atomic包下,LongAdder在高并发的场景下会比AtomicLong 具有更好的性能,代价是消耗更多的内存空间。
(上图来自LongAdder类注释)
在LongAdder之前,当我们在进行计数统计的时,通常会使用AtomicLong来实现。AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
看一下AtomicLong的增加计数方法源码:
可以看到在高并发情况下,当有大量线程同时去更新一个变量,任意一个时间点只有一个线程能够成功,绝大部分的线程在尝试更新失败后,会通过自旋的方式再次进行尝试,这样严重占用了CPU的时间片,进而导致系统性能问题。
LongAdder设计思想上,采用分段的方式降低并发冲突的概率。通过维护一个基准值base和 Cell 数组:
- public class LongAdderTestDemo {
- public static void main(String[] args) throws InterruptedException {
- LongAdder adder = new LongAdder();
- int [] num = new int[1];
-
- Thread[] threads = new Thread[10];
- for (int i = 0; i < 10; i++) {
- threads[i] = new Thread(() -> {
- for (int j = 0; j < 10000; j++) {
- adder.add(1);
- num[0] += 1;
- }
- });
- threads[i].start();
- }
- for (int i = 0; i < 10; i++) {
- threads[i].join();
- }
- // 通过对比发现,使用LongAdder能保证最后的值是期望的值,不存在并发写错误
- System.out.println("adder:" + adder);
- System.out.println("num:" + num[0]);
- }
- }
LongAdder类继承Striped64类,本身只提供了有一个无参构造方法:
对外暴露increment、decrement、sum等方法,其中increment和decrement内部调用的也是add方法(后面会详细分析add方法源码,先了解个轮廓)。
再看父类Striped64都包含什么成员变量(代码太长,不适合截图,直接按需贴代码段):
- abstract class Striped64 extends Number {
-
- @jdk.internal.vm.annotation.Contended static final class Cell {
- // 代码省略...
- }
-
- /**
- * 当然CPU的数量,在Cell数组扩容时会被使用到
- */
- static final int NCPU = Runtime.getRuntime().availableProcessors();
-
- /**
- * cell数组,长度是2的幂次方
- */
- transient volatile Cell[] cells;
-
- /**
- * Base value, used mainly when there is no contention, but also as
- * a fallback during table initialization races. Updated via CAS.
- 通常如下两种场景使用:
- * 1.通常在无并发时,线程直接CAS更新该值
- 2.初始化数组时,必须要保证数组只被初始化一次,其他竞争失败的线程会将数值累加到base上
- */
- transient volatile long base;
-
- /**
- * cell数组对应的锁,初始化cells或者扩容cells需要获取锁。
- * 0表示当前cell数组没有线程使用,1表示当前数组已经有线程占用
- */
- transient volatile int cellsBusy;
-
- // VarHandle 是JDK9之后提供的用于替代之前Unsafe类的,可直接操作内存中的变量
- private static final VarHandle BASE;
- private static final VarHandle CELLSBUSY;
- private static final VarHandle THREAD_PROBE;
-
- // 其他方法省略...
- }
对VarHandler的一点说明:
Unsafe
是不建议开发者直接使用的,因为Unsafe
所操作的并不属于Java标准,会容易带来一些安全性的问题。JDK9 之后,官方推荐使用java.lang.invoke.Varhandle
来替代Unsafe
大部分功能,对比Unsafe
,VarHandle
有着相似的功能,但会更加安全,并且,在并发方面也提高了不少性能。
上面对LongAdder和Striped64的主要成员变量大概有个表面的认识,下面开始对关键的代码进行具体的分析。
代码如下截图:
静态代码块主要是通过MethodHandles和VarHandle来获取Striped64类的变量base、cellsBusy和Thread类的threadLocalRandomProbe变量。
代码如下截图:
Cell是一个静态内部类,且被@sun.misc.Contended修饰(避免伪共享问题),内部包含一个被volatile修饰的value变量,内部静态代码块同样使用MethodHandles实现VALUE变量的初始化。此外提供了一个cas接口,底层通过VarHandler的compareAndSet方法实现value值的CAS更新。
看代码前先看下入参:
long x : 需要增加的值,一般默认都是1
LongBinaryOperator fn : 默认传递的是null
wasUncontended:竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false
- final void longAccumulate(long x, LongBinaryOperator fn,
- boolean wasUncontended) {
- int h;
- // 通过getProbe方法获取为线程生成一个非0的hash值,底层是通过VarHandle实现
- if ((h = getProbe()) == 0) {
- /*
- 如果当前线程的hash值为0,0同数组长度取余后依旧是0,会固定到数组第一个位置。所以
- 这里通过ThreadLocalRandom产生一个随机数,重新计算一个hash值,但认为此次不算是
- 一次竞争,所以将wasUncontended设置为true.
- */
- ThreadLocalRandom.current(); // force initialization
- h = getProbe();
- wasUncontended = true;
- }
- // 扩容意向,当为false表示不扩容
- boolean collide = false; // True if last slot nonempty
- done: for (;;) {
- Cell[] cs; Cell c; int n; long v;
- // 情况一:cells数组不为空,且长度大于0
- if ((cs = cells) != null && (n = cs.length) > 0) {
- if ((c = cs[(n - 1) & h]) == null) {
- if (cellsBusy == 0) { // Try to attach new Cell
- Cell r = new Cell(x); // Optimistically create
- if (cellsBusy == 0 && casCellsBusy()) {
- try { // Recheck under lock
- Cell[] rs; int m, j;
- if ((rs = cells) != null &&
- (m = rs.length) > 0 &&
- rs[j = (m - 1) & h] == null) {
- rs[j] = r;
- break done;
- }
- } finally {
- cellsBusy = 0;
- }
- continue; // Slot is now non-empty
- }
- }
- collide = false;
- }
- else if (!wasUncontended) // CAS already known to fail
- wasUncontended = true; // Continue after rehash
- else if (c.cas(v = c.value,
- (fn == null) ? v + x : fn.applyAsLong(v, x)))
- break;
- else if (n >= NCPU || cells != cs)
- collide = false; // At max size or stale
- else if (!collide)
- collide = true;
- else if (cellsBusy == 0 && casCellsBusy()) {
- try {
- if (cells == cs) // Expand table unless stale
- cells = Arrays.copyOf(cs, n << 1);
- } finally {
- cellsBusy = 0;
- }
- collide = false;
- continue; // Retry with expanded table
- }
- h = advanceProbe(h);
- }
- // 情况二:cells数组还未初始化,最后调用casCellsBusy()通过CAS操作去获取锁
- else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
- try { // Initialize table
- if (cells == cs) {
- Cell[] rs = new Cell[2];
- // 建一个新的Cell元素,value是x值,默认为1
- rs[h & 1] = new Cell(x);
- cells = rs;
- break done;
- }
- } finally {
- cellsBusy = 0;
- }
- }
- // 情况三:有别的线程正在初始化数组,则尝试累加在base变量上
- else if (casBase(v = base,
- (fn == null) ? v + x : fn.applyAsLong(v, x)))
- break done;
- }
- }
- public void add(long x) {
- /*
- 对下面几个变量的说明:
- 1)as表示cells数组的引用
- 2)b表示获取的base值
- 3)v表示期望值
- 4)m表示cells数组的长度
- 5)a表示当前线程命中的cell单元格
- */
- Cell[] as; long b, v; int m; Cell a;
-
- /*
- 如果cells数组不为空,或者当前线程竞争base失败.
- 由于初始时cells为空,第一次调用add()方法的话,(as = cells) != null不成立,
- 所以会执行 !casBase(b = base, b + x)尝试对base值进行CAS修改
- */
- if ((as = cells) != null || !casBase(b = base, b + x)) {
- //
- boolean uncontended = true;
- if (as == null || (m = as.length - 1) < 0 ||
- (a = as[getProbe() & m]) == null ||
- !(uncontended = a.cas(v = a.value, v + x)))
- longAccumulate(x, null, uncontended);
- }
- }
这里着重对内层嵌套的这部分if判断条件进行拆分分析:
这里分为三步:
其中如果前一个表达式成立,则不会在执行后一个表达式。
第一步:as == null || (m = as.length - 1) < 0
此条件成立说明cells数组还未初始化。如果不成立则说明cells数组已经完成初始化,对应的线程需要找到Cell数组中的元素去写值,接着执行下面步骤2。
第二步:(a = as[getProbe() & m]) == null
通过getProbe()获取当前线程的hash值,并同m做与运算(m=cells长度-1),当条件成立时说明当前线程通过hash计算出来数组位置处的cell为空,则去执行longAccumulate()方法。如果不成立则说明对应的cell不为空,则接着执行下面步骤3。
第三步:!(uncontended = a.cas(v = a.value, v + x)
通过a.cas(v = a.value, v + x)尝试进行一次CAS更改value操作,如果成功则退出if条件,失败则继续往下执行longAccumulate()方法。
- /**
- * 返回累加的和,也就是"当前时刻"的计数值
- * 注意: 高并发时,此返回值可能不是绝对准确的,因为调用这个方法时,没有对Cell数组进行加锁
- 可能会有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值
- */
- public long sum() {
- Cell[] as = cells; Cell a;
- long sum = base;
- if (as != null) {
- for (int i = 0; i < as.length; ++i) {
- if ((a = as[i]) != null)
- sum += a.value;
- }
- }
- return sum;
- }