• JDK8 新特性 LongAdder 源码解析


    JDK8 新特性 LongAdder 源码解析

    原子累加器

    • LongAdder
    • DoubleAdder
    • LongAccumulator
    • DoubleAccumulator

    jdk8 以后,新增了几个专门用来做累加的类,它们累加的性能要比 Atomic 类高很多。

    累加器性能比较

    /**
     * Atomic和LongAdder耗时测试
     */
    public class Main {
        public static void main(String[] args) throws Exception{
            testAtomicLongAdder(1, 10000000); // 1个线程 共累加1千万次
            testAtomicLongAdder(10, 10000000); // 10个线程 共累加1千万次
            testAtomicLongAdder(100, 10000000); // 100个线程 共累加1千万次
        }
    
        static void testAtomicLongAdder(int threadCount, int times) throws Exception{
            System.out.println("threadCount: " + threadCount + ", times: " + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms");
            System.out.println("threadCount: " + threadCount + ", times: " + times);
            long atomicStart = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms");
            System.out.println("----------------------------------------");
        }
    
        static void testAtomicLong(int threadCount, int times) throws Exception{
            AtomicLong atomicLong = new AtomicLong();
            List<Thread> list = new ArrayList<>();
            for (int i = 0; i < threadCount; i++) {
                list.add(new Thread(() -> {
                    for (int j = 0; j < times; j++) {
                        atomicLong.incrementAndGet();
                    }
                }));
            }
    
            for (Thread thread : list) {
                thread.start();
            }
    
            for (Thread thread : list) {
                thread.join();
            }
    
            System.out.println("AtomicLong value is : " + atomicLong.get());
        }
    
        static void testLongAdder(int threadCount, int times) throws Exception{
            LongAdder longAdder = new LongAdder();
            List<Thread> list = new ArrayList<>();
            for (int i = 0; i < threadCount; i++) {
                list.add(new Thread(() -> {
                    for (int j = 0; j < times; j++) {
                        longAdder.increment();
                    }
                }));
            }
    
            for (Thread thread : list) {
                thread.start();
            }
    
            for (Thread thread : list) {
                thread.join();
            }
    
            System.out.println("LongAdder value is : " + longAdder.longValue());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    输出

    threadCount: 1, times: 10000000
    LongAdder value is : 10000000
    LongAdder 耗时:181ms
    threadCount: 1, times: 10000000
    AtomicLong value is : 10000000
    AtomicLong 耗时:106ms
    ----------------------------------------
    threadCount: 10, times: 10000000
    LongAdder value is : 100000000
    LongAdder 耗时:364ms
    threadCount: 10, times: 10000000
    AtomicLong value is : 100000000
    AtomicLong 耗时:1769ms
    ----------------------------------------
    threadCount: 100, times: 10000000
    LongAdder value is : 1000000000
    LongAdder 耗时:1500ms
    threadCount: 100, times: 10000000
    AtomicLong value is : 1000000000
    AtomicLong 耗时:17597ms
    ----------------------------------------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在线程数只有 1 个的时候,LongAdder 并未体现出优势;

    但随着线程数的增加,LongAdder 和 AtomicLong 的差距就相当明显了,AtomicLong 的性能急剧下降,耗时是 LongAdder 的数倍。

    由此可看出 LongAdder 在多线程高并发下的数据统计性能非常优秀。

    性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

    LongAdder 和 LongAccumulator 区别:

    • 相同点:

      • LongAdder 与 LongAccumulator 类都是使用非阻塞算法 CAS 实现的
      • LongAdder 类是 LongAccumulator 类的一个特例,只是 LongAccumulator 提供了更强大的功能,可以自定义累加规则,当LongBinaryOperator accumulatorFunction 为 null 时就等价于 LongAdder
    • 不同点:

      • 调用 casBase 时

        • LongAccumulator 使用 function.applyAsLong(b = base, x) 来计算
        • LongAdder 使用 casBase(b = base, b + x) 来计算
      • LongAccumulator 类功能更加强大,构造方法参数中

        • LongBinaryOperator accumulatorFunction 是一个双目运算器接口,可以指定累加规则,比如累加或者相乘,其根据输入的两个参数返回一个计算值,LongAdder 内置累加规则
        • identity 则是 LongAccumulator 累加器的初始值,LongAccumulator 可以为累加器提供非 0 的初始值,而 LongAdder 只能提供默认的 0

    LongAccumulator 和LongAdder 的实现方式是完全一样的,只是做了一些定制。通过 LongBinaryOperator 函数,后面我们在看源码的时候会看到,二者唯一的不同就是计算是用的值还是用的 LongBinaryOperator 来计算。LongAccumulator 性能慢一些,是因为 Lambda 表达式(方法引用)会额外的创建一些对象。

    LongAdder简介

    优化机制

    LongAdder 是 Java8 提供的类,跟 AtomicLong 有相同的效果,但对 CAS 机制进行了优化,尝试使用分段 CAS 以及自动分段迁移的方式来大幅度提升多线程高并发执行 CAS 操作的性能。

    CAS 底层实现是在一个循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈修改成功率很高,否则失败率很高,失败后这些重复的原子性操作会耗费性能(导致大量线程空循环,自旋转)。

    优化核心思想:数据分离,将 AtomicLong 的单点的更新压力分担到各个节点,空间换时间,在低并发的时候直接更新,可以保障和 AtomicLong 的性能基本一致,而在高并发的时候通过分散减少竞争,提高了性能。

    分段 CAS 机制

    • 在发生竞争时,创建 Cell 数组用于将不同线程的操作离散(通过 hash 等算法映射)到不同的节点上
    • 设置多个累加单元(会根据需要扩容,最大为 CPU 核数),Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1] 等,最后将结果汇总
    • 在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能

    自动分段迁移机制:某个 Cell 的 value 执行 CAS 失败,就会自动寻找另一个 Cell 分段内的 value 值进行 CAS 操作。


    伪共享

    一个缓存行加入了多个 Cell 对象叫做伪共享。

    Cell 为累加单元:数组访问索引是通过 Thread 里的 threadLocalRandomProbe 域取模实现的,这个域是 ThreadLocalRandom 更新的

    // Striped64.Cell
    // @sun.misc.Contended:防止缓存行伪共享
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        // 最重要的方法 用 cas 方式进行累加, prev 表示旧值, next 表示新值
        final boolean cas(long prev, long next) {
        	return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
        }
        // 省略不重要代码
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    得从缓存说起

    缓存与内存的速度比较

    image-20221002204224467

    cpu 大约需要的时钟周期
    寄存器1 cycle (4GHz 的 CPU 约为0.25ns)
    L13~4 cycle
    L210~20 cycle
    L340~45 cycle
    内存120~240 cycle
    • 因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
    • 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
    • 缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
    • CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效

    image-20221002204508578

    因为 Cell 是数组形式,在内存中是连续存储的,64 位系统中,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),每一个 cache line 为 64 字节,因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

    • Core-0 要修改 Cell[0]
    • Core-1 要修改 Cell[1]

    无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效。

    @sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效。

    image-20221002204543917


    LongAdder图解

    • LongAdder 的结构比较简单,如下图

      image.png

    • 其由 base 值 (long类型) 和 Cell 数组构成,如上图

    • 当没有 base 的更新没有线程竞争的时候,会直接写到 base 里面去,而不会操作 Cell 数组,当 base 的写出现了竞争的时候,就会创建 Cell 数组,由不同的线程写不同的下标。当最后求和的时候,通过上述的公式,sum = base + 所有槽的值

      s u m = b a s e + ∑ i = 0 n C e l l [ i ] sum=base+\sum_{i=0}^{n} Cell[i] sum=base+i=0nCell[i]

    • 执行流程图

      image.png

    • 刚开始第一次去理解上述的内容比较抽象,现在从源码开始讲起。

    LongAdder源码

    属性

    • LongAdder 的属性,在其父类 Striped64 中

      abstract class Striped64 extends Number {
      
          /*
           * @sun.misc.Contended 添加缓存填充,用来消除伪共享
           * 伪共享会导致缓存行失效,缓存一致性开销变大。
           * 伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU缓存失效。
           * 尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,
           * 它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。这里对于伪共享我只是提一下概念,并不会深入去讲解,大家可以自行查阅一些资料。
           * 解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程的变量存在于不同的 CacheLine 即可,
           * 使用多余的字节来填充可以做点这一点,这样就不会出现伪共享问题。在Disruptor队列的设计中就有类似设计。
           */
          @sun.misc.Contended static final class Cell {
              volatile long value;
              Cell(long x) { value = x; }
              final boolean cas(long cmp, long val) {
                  return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
              }
      
              // Unsafe mechanics
              private static final sun.misc.Unsafe UNSAFE;
              private static final long valueOffset; // 偏移量
              static {
                  try {
                      UNSAFE = sun.misc.Unsafe.getUnsafe();
                      Class<?> ak = Cell.class; // 拿到Cell对象
                      valueOffset = UNSAFE.objectFieldOffset // 获取偏移地址
                              (ak.getDeclaredField("value")); // 获取value字段
                  } catch (Exception e) {
                      throw new Error(e);
                  }
              }
          }
      
          /** Number of CPUS, to place bound on table size */
          // 表示当前计算机CPU核心数,作用 => 控制cells数组长度的一个关键条件
          static final int NCPU = Runtime.getRuntime().availableProcessors();
      
          /**
           * Table of cells. When non-null, size is a power of 2.
           * Cells 数组 大小是2的幂
           */
          transient volatile Cell[] cells;
      
          /**
           * Base value, used mainly when there is no contention, but also as
           * a fallback during table initialization races. Updated via CAS.
           * base 基础 value 值,当并发较低的时候,只累加该值,主要用于没有竞争的情况,通过CAS更新 | 当cells扩容时,需要将数据写入base中
           */
          transient volatile long base;
      
          /**
           * Spinlock (locked via CAS) used when resizing and/or creating Cells.
           * 创建或者落入Cells数组的时候使用的自旋锁变量调整单元格大小,创建单元格时候使用的锁
           * 初始化cells或者扩容cells需要获取锁 0表示无锁状态 1表示其他线程已经持有了锁
           */
          transient volatile int cellsBusy;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57

    工作流程:

    • cells 占用内存是相对比较大的,是惰性加载的,在无竞争或者其他线程正在初始化 cells 数组的情况下,直接更新 base 域

    • 在第一次发生竞争时(casBase 失败)会创建一个大小为 2 的 cells 数组,将当前累加的值包装为 Cell 对象,放入映射的槽位上

    • 分段累加的过程中,如果当前线程对应的 cells 槽位为空,就会新建 Cell 填充,如果出现竞争,就会重新计算线程对应的槽位,继续自旋尝试修改

    • 分段迁移后还出现竞争就会扩容 cells 数组长度为原来的两倍,然后 rehash,数组长度总是 2 的 n 次幂,默认最大为 CPU 核数,但是可以超过,如果核数是 6 核,数组最长是 8

    add()

    • LongAdder#add()方法

      我们通常使用的是 LongAdder 的 increment() 方法,也就是自增方法,底层 LongAdder 其实调用的是 add() 方法

      public class LongAdder extends Striped64 implements Serializable {
          /**
           * Equivalent to {@code add(1)}.
           * 自增 实际上调用的是 add(1L);
           */
          public void increment() {
              add(1L);
          }
          
          /**
           * Equivalent to {@code add(-1)}.
           * 自减 实际上调用的是 add(1L);
           */
          public void decrement() {
              add(-1L);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • LongAdder继承Striped64,并且使用到了Striped64中的方法,这是一个公共方法,我们稍后再说。

    • 现在来分析 LongAdder 的 add() 方法

      public class LongAdder extends Striped64 implements Serializable {
          /**
           * Adds the given value.
           * 1.casBase
           * 2.casBase失败,创建cells
           * 3.cells失败,扩容cells,扩容到CPU核心数大小
           * @param x the value to add
           */
          public void add(long x) {
              // as 表示当前cells引用
              // b 表示获取的base值
              // v 表示期望值
              // m 表示cells数组的长度
              // a 表示当前线程命中的cell单元格
              Cell[] as; long b, v; int m; Cell a;
              // 条件1:true->表示cell已经初始化过了 当前线程应该将数据写入到对应的cell中
              // 		  false->表示第一次加载,cells未初始化,所有的数据应该尝试写到base中
              // 条件2:true->表示当前线程CAS数据替换失败,发生竞争了,可能需要重试 或者 扩容,这里指的是 !casBase(x,y) 整体是否为true
              // 		  false->表示当前线程CAS成功
              if ((as = cells) != null || !casBase(b = base, b + x)) {
                  // 什么时候会进来?
                  // 1.true->cells已经初始化,需要尝试将数据写入到对应的cell,注意,这里是尝试写入
                  // 2.true->发生竞争了,可能需要重试 或 扩容
      
                  // ture->未竞争 false->发生竞争
                  boolean uncontended = true;
      
                  // 第一次 as 为null
      
                  // 已经新建了数组了
                  // 如果某个calls槽cas失败,就可能进行扩容操作
                  // 如果第一次写发现值为null,初始化了cell,但是没有设置值。需要设置值
                  // 如果不为null,就进行cas操作
                  // 如果不为null,就进行cas操作 且 cas操作失败,然后扩容
      
                  // 条件1:as == null true->说明当前cells未初始化,也就是多线程写base发生竞争了
                  //  				 false->说明当前cells已经始化了,当前线程应该是 找自己的cell 写值
                  // 条件2:(m = as.length - 1) < 0
                  //        true->当前cells数组长度为0
                  //        false->当前cells数组长度大于0
                  //        m = cells数组长度,且长度一定为2的幂,后续会看到或参考HashMap的扩容
                  // 条件3:(a = as[getProbe() & m]) == null 
                  //		  true->当前线程的cells[index]对应的数据为null,说明是第一次写值,需要创建 longAccumulate 支持
                  //        false->说明当前线程对应的cell不为空,说明 下一步想要将x值 添加到cell中
                  //        捞一下as[getProbe() & m]:获取当前线程的hash值 与上 cells长度-1(m,上一个条件计算的),计算cells中的下标
                  // 条件4:!(uncontended = a.cas(v = a.value, v + x)) 
                  //        true->当前 a = cells[index] cas写失败 此时 uncontended = false 发生竞争了
                  //        false->表示cas成功
                  if (as == null || (m = as.length - 1) < 0 ||
                      (a = as[getProbe() & m]) == null ||
                      !(uncontended = a.cas(v = a.value, v + x)))
                      // 都有哪些情况会调用?
                      // 1.true->说明当前cells未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
                      // 2.true->当前cells数组长度为0
                      // 3.true->当前线程的cells[index]对应的数据为null,说明是第一次写值,需要创建 longAccumulate 支持
                      // 4.true->当前 a = cells[index] cas写失败 此时 uncontended = false 发生竞争了[重试|扩容]
                      // 执行累加 调用父类Striped64的longAccumulate方法
                      longAccumulate(x, null, uncontended);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61

      流程图

      image-20221002205808339

    Striped64中一些操作方法

    • Striped64 提供了数据的 CAS 操作,和线程的 hash 处理,如下

      abstract class Striped64 extends Number {
      
          /**
           * CASes the base field.
           */
          final boolean casBase(long cmp, long val) {
              // cas 进行更新 base值
              // 返回是否更新成功,CAS只写一次
              // 可能写失败,写失败就开始建立Cells
              return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
          }
          
          /**
           * CASes the cellsBusy field from 0 to 1 to acquire lock.
           * 通过 CAS 方式获取锁,操作 cellsBusy 的值 0 => 1
           */
          final boolean casCellsBusy() {
              return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
          }
          
          /**
           * Returns the probe value for the current thread.
           * Duplicated from ThreadLocalRandom because of packaging restrictions.
           * 获取当前线程的hash值
           */
          static final int getProbe() {
              return UNSAFE.getInt(Thread.currentThread(), PROBE);
          }
          
          /**
           * Pseudo-randomly advances and records the given probe value for the
           * given thread.
           * Duplicated from ThreadLocalRandom because of packaging restrictions.
           * 重置当前线程的hash值
           */
          static final int advanceProbe(int probe) {
              probe ^= probe << 13; // xorshift
              probe ^= probe >>> 17;
              probe ^= probe << 5;
              UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
              return probe;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43

    Striped64#longAccumulate方法

    在这个方法中,我们就提到了之前的 LongBinaryOperator 函数,实际在进行计算的时候,是判断是否有这个值来进行处理的。

    但我们通过 LongAdder 传过来的 LongBinaryOperator 这个值为 null,所以这里就不用考虑了。

    • Striped64#longAccumulate:cell 数组创建

      abstract class Striped64 extends Number {
          /**
           * Handles cases of updates involving initialization, resizing,
           * creating new Cells, and/or contention. See above for
           * explanation. This method suffers the usual non-modularity
           * problems of optimistic retry code, relying on rechecked sets of
           * reads.
           *
           * @param x the value 期望的值
           * @param fn the update function, or null for add (this convention
           * avoids the need for an extra field or function in LongAdder).
           * 执行更新的方法
           * @param wasUncontended false if CAS failed before call // Cells 初始化之后,并当前线程CAS修改失败 为false
           *
           * 如何进来这个方法
           * 1.cells未初始化并且casBase失败(多线程写base发生竞争了)[初始化cells] -> 进入CASE2 拿到锁了 则创建一个size为2的cells 没拿到锁 数据累加到base中
           * 2.cells的对应的值为null,第一次写值,需要 longAccumulate 支持 -> 进入CASE1.1 创建一个Cell 赋值到对应的下标中
           * 3.cells初始化并且当前线程cas失败,当前线程对应的cell有竞争[重试|扩容] -> 进入CASE1.2 因为之前发生过竞争了 所以需要将wasUncontended重新设置为true 重置当前hash值 再进入CASE1.1 判断依旧不为空 则进入CASE1.3 重试CAS 成功则退出 失败则进入CASE1.5把扩容意向改为true 再rehash 进行自旋 如果CASE1.3依旧CAS失败 则进行CASE1.6扩容逻辑 所以如果执行到扩容 则至少需要3次CAS失败
           */
          final void longAccumulate(long x, LongBinaryOperator fn,
                                    boolean wasUncontended) {
              // h: 线程hash值
              int h;
              // 还没有给线程分配hash值,这个时候先分配一手hash值
              if ((h = getProbe()) == 0) {
                  // 等于0 强制设置为当前线程,给当前线程分配 hash 值
                  ThreadLocalRandom.current(); // force initialization 强制
                  // 重新获取 probe 值,hash值被设置为一个全新的线程,所以设置了 wasUncontended 为true
                  h = getProbe();
                  // 重新计算了,认为不是热点 也即是说,线程第一次进来,计算的hash值为0,当前线程的hash值0与任何数据位运算都是0,
                  // 那么就会落到第一个cell,这是不合理的,所以需要将wasUncontended设置为true
                  // 也就是说,首次线程访问,先会写cell[0]号位置
                  // 写成功了,就没有下文了,写失败了,说明发生了竞争,那么就不适合再给线程设置为0了,因为已经发生了抢夺
                  // 不把它当做一次真正的竞争
                  wasUncontended = true;
              }
              // collide 表示扩容意向,false 一定不会扩容,true可能扩容
              boolean collide = false;                // True if last slot nonempty
              // 自旋
              for (;;) {
                  // as cells数组引用
                  // a 当前线程的命中的cell
                  // n cells数组长度
                  // v 表示期望值
                  Cell[] as; Cell a; int n; long v;
                  // CASE1:cells 已经初始化了,当前线程应该将数据写入到对应的cell中
                  if ((as = cells) != null && (n = as.length) > 0) {
                      // CASE1.1: 如果计算完成之后,计算当前的cell单元为null,说明这个cell没有被使用,则需要新建 new cell
                      if ((a = as[(n - 1) & h]) == null) {
                          // cellsBusy == 0 锁未被占用
                          // 如果cells数组没有再扩容
                          if (cellsBusy == 0) {       // Try to attach new Cell
                              // 拿当前的x创建一个cell单元
                              Cell r = new Cell(x);   // Optimistically create
                              // 尝试加锁
                              if (cellsBusy == 0 && casCellsBusy()) {
                                  // 是否创建成功的标记
                                  boolean created = false;
                                  // 在有锁的情况下在检查一下之前的判断
                                  try {               // Recheck under lock
                                      // rs 表示当前cells的引用
                                      // m 表示cells长度
                                      // j 表示当前线程命中的下标
                                      Cell[] rs; int m, j;
                                      // 条件1和条件2恒成立,因为进来的时候就已经判断了
                                      // 这里大家可能会有疑惑,为毛没有再判断一次 cells == as
                                      // 因为这个时候,两个线程A、B进来之后,A停在加锁之前,B进来了,那么A、B必然插入的是同一个槽
                                      // 这时会判断 rs[j = (m - 1) & h] == null) 为了防止其它线程初始化过该位置 如果为null才赋值 否则就不管了
                                      if ((rs = cells) != null &&
                                          (m = rs.length) > 0 &&
                                          rs[j = (m - 1) & h] == null) { // 所以不加这个判断可能会产生重赋值 造成丢失数据
                                          rs[j] = r;
                                          created = true;
                                      }
                                  } finally {
                                      // 解锁
                                      cellsBusy = 0;
                                  }
                                  // 创建并且赋值成功,跳出循环
                                  if (created)
                                      break;
                                  // 这个时候目标槽已经不是空了 继续自旋
                                  // LongAdder 必须保证每个值都写进去
                                  continue;           // Slot is now non-empty
                              }
                          }
                          // 扩容意向修改为false
                          // 因为当前线程还没写值,不一定写失败,所以扩容意向为false
                          collide = false;
                      }
                      // CASE1.2:只有一种情况会进来,cells初始化之后,当前线程竞争修改失败,并且已经初始化线程hash值
                      // wasUncontended目前false,这里是重新设置这个值为 true
                      // 紧接着执行 advanceProbe(h) 重置当前线程的hash,继续循环
                      else if (!wasUncontended) 		// CAS already known to fail
                          wasUncontended = true; 		// Continue after rehash
                     // 如果为true。就继续执行当前cell位的cas操作
                     // CASE1.3:当前线程rehash过hash值,新命中的cell不为空,就来到这里,然后进行cas写
                     // 如果写成功了,跳出循环
                     // 如果写失败了,表示rehash之后,又写失败了,进入下一个条件CASE1.4
                      else if (a.cas(v = a.value, ((fn == null) ? v + x :
                              fn.applyAsLong(v, x))))
                          break;
                      // CASE1.4:
                      // 条件1:n 是数组长度 大于CPU核心数,扩容意向collide设置为false,表示不再扩容了
                      // 条件2:cells != as 其他线程已经扩容过了,当前线程rehash之后重试即可
                      else if (n >= NCPU || cells != as)
                          // 如果n大于等于 NCPU 不可扩容
                          collide = false;            // At max size or stale
                      // CASE1.5:
                      // 设置扩容意向为true,但是不一定扩容成功
                      // 如果扩容意向为false,重新设置为true 然后继续执行循环
                      // 设置为true之后,再次rehash然后再次循环,如果又失败了,这里且不满足 (n >= NCPU || cells != as)
                      // 此时collide为true 执行 CASE1.6
                      else if (!collide)
                          collide = true;
                      // CASE1.6:扩容操作并加锁 注意:只是扩容,并没有进行赋值
                      // 条件1:cellsBusy == 0 表示当前没有占用锁,当前线程可以抢锁
                      // 条件2:casCellsBusy() 抢锁
                      else if (cellsBusy == 0 && casCellsBusy()) {
                          try {
                              // 和上面一样,需要判断,防止重复扩容
                              if (cells == as) { 		// Expand table unless stale
                                  // 扩容2倍
                                  Cell[] rs = new Cell[n << 1];
                                  for (int i = 0; i < n; ++i)
                                      rs[i] = as[i];
                                  // 扩容赋值
                                  cells = rs;
                              }
                          } finally {
                              // 释放锁
                              cellsBusy = 0;
                          }
                          // 扩容意向设置为false
                          collide = false;
                          continue;           		// Retry with expanded table
                      }
                      // 重新设置线程hash值 h
                      h = advanceProbe(h);
                  }
                  // CASE2: cells 没加锁并且没有初始化(cells为null),则尝试进行加锁并且开始初始化操作
                  // 条件1:cellsBusy == 0 没有加锁
                  // 条件2:cells == as 为什么又对比一次?因为多线程场景下,A线程过来,发现是null,然后进入了,然后B线程过来,不一定是null
                  // 可能其他线程会修改了cells
                  // 条件3:true->表示cas获取锁成功,将cellsBusy设置为1,false->表示其他线程正在持有锁
                  else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                      // 初始化操作
                      boolean init = false;
                      try { 						// Initialize table
                          // 初始化的时候,进行创建cells数组
                          // casBase 失败,新建大小为2的cells数组
                          // 为毛又对比一下?因为是这样的,有可能在执行 cellsBusy == 0 && cells == as && casCellsBusy() 的时候
                          // 执行到 cellsBusy == 0 && cells == as 线程让出CPU 然后另外一个线程执行为true,那么其可以继续操作cells
                          // 此时在切换过来,这个cells就不是null了
                          // 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
                          // 类似双重检查锁 - 这里是安全的,因为多线程场景下只有一个线程能够拿到锁,使用casCellsBusy()保证
                          if (cells == as) {
                              // 新建大小为2的cells数组
                              Cell[] rs = new Cell[2];
                              // 设置值
                              rs[h & 1] = new Cell(x);
                              // 赋值cells
                              cells = rs;
                              // 初始化成功
                              init = true;
                          }
                      } finally {
                          cellsBusy = 0; // 最后要释放锁
                      }
                      if (init)
                          break; // 初始化成功则退出自旋
                  }
                  // CASE3:cells 正在初始化 则尝试在base上进行累加
                  // 1.当前cellsBusy被别的线程持有(上锁),表示其它线程正在初始化cells,这个时候需要进行兜底操作,将当前线程值累加到base
                  // 2.cells被其他线程初始化后,需当前线程需要将数据要累加到base
                  else if (casBase(v = base, ((fn == null) ? v + x :
                          fn.applyAsLong(v, x))))
                      break;              // Fall back on using base
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181

      流程图

      image-20221002205847559

      image-20221002205905807

      每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)

      image-20221002205927822

    LongAdder其他方法

    • 当没有 base 的更新没有线程竞争的时候,会直接写到 base 里面去,而不会操作 Cell 数组,当 base 的写出现了竞争的时候,就会创建 Cell 数组,由不同的线程写不同的下标。当最后求和的时候,通过上述的公式,sum = base + 所有槽的值。获取最终结果通过 sum 整合。

      s u m = b a s e + ∑ i = 0 n C e l l [ i ] sum=base+\sum_{i=0}^{n} Cell[i] sum=base+i=0nCell[i]

      保证最终一致性,不保证强一致性。

      public class LongAdder extends Striped64 implements Serializable {
          
          // 求和方法,只保证最终一致性
          // 计算方式如上
          public long sum() {
              Cell[] as = cells; Cell a;
              long sum = base;
              if (as != null) {
                  for (int i = 0; i < as.length; ++i) {
                      if ((a = as[i]) != null)
                          sum += a.value;
                  }
              }
              // 不保证返回精确值,他是最终一致性的
              return sum;
          }
      
          // 数值归零方法
          public void reset() {
              Cell[] as = cells; Cell a;
              base = 0L;
              if (as != null) {
                  for (int i = 0; i < as.length; ++i) {
                      if ((a = as[i]) != null)
                          a.value = 0L;
                  }
              }
          }
      
          // 数值归零并返回sum值
          public long sumThenReset() {
              Cell[] as = cells; Cell a;
              long sum = base;
              base = 0L;
              if (as != null) {
                  for (int i = 0; i < as.length; ++i) {
                      if ((a = as[i]) != null) {
                          sum += a.value;
                          a.value = 0L;
                      }
                  }
              }
              return sum;
          } 
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45

    总结

    AtomicLong可以弃用了吗

    • 看上去 LongAdder 的性能全面超越了 AtomicLong,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数),但是我们真的就可以舍弃掉 LongAdder 了吗?
    • 当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用 AtomicLong 可能会更好一些,而且内存需求也会小一些。
    • 我们看过 sum() 方法后可以知道 LongAdder 在统计的时候如果有并发更新,可能导致统计的数据有误差 (保证最终一致性,不保证强一致性)
    • 而在高并发统计计数的场景下,才更适合使用 LongAdder。

    收获

    • LongAdder 中最核心的思想就是利用空间来换时间,将热点 value 分散成一个 Cell 列表来承接并发的 CAS,以此来提升性能。
    • LongAdder 的原理及实现都很简单,但其设计的思想值得我们品味和学习。



    参考

  • 相关阅读:
    Viewport的作用
    nginx源码层面探究request_time、upstream_response_time、upstream_connect_time与upstream_header_time指标具体含义
    数据库基础知识详解五:MySQL中的索引和其两种引擎、主从复制以及关系型/非关系型数据库
    Selenium基础 — Selenium中的expected_conditions模块(一)
    【EXCEL拦路虎】解决一些常遇到的excel问题
    vue部分入门知识点代码示例
    Git-Lab私域共享代码
    SpringBoot+Vue项目疫苗预约管理系统
    全国市政公用事业和邮政、电信业发展数据,shp/excel格式
    Java代码实现不同应用系统中数据同步程序
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/127702138