• LongAdder原理浅析


    1.什么是LongAdder

             LongAdder是JDK1.8由Doug Lea大神新增的原子操作类,位于java.util.concurrent.atomic包下,LongAdder在高并发的场景下会比AtomicLong 具有更好的性能,代价是消耗更多的内存空间。

    (上图来自LongAdder类注释) 

    2.为什么需要LongAdder

            在LongAdder之前,当我们在进行计数统计的时,通常会使用AtomicLong来实现。AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

            看一下AtomicLong的增加计数方法源码:

            可以看到在高并发情况下,当有大量线程同时去更新一个变量,任意一个时间点只有一个线程能够成功,绝大部分的线程在尝试更新失败后,会通过自旋的方式再次进行尝试,这样严重占用了CPU的时间片,进而导致系统性能问题。

    3.工作原理

            LongAdder设计思想上,采用分段的方式降低并发冲突的概率。通过维护一个基准值base和 Cell 数组:

    • 当没有出现多线程竞争的情况,线程会直接对base里面的value进行修改。
    • 当多线程的时候,那么LongAdder会初始化一个cell数组,然后对每个线程获取对应的hash值,之后通过hash & (size -1)[size为cell数组的长度]将每个线程定位到对应的cell单元格,之后这个线程将值写入对应的cell单元格中的value,之后将所有cell单元格的value和base中的value进行累加求和得到最终的值。并且每个线程竞争的Cell的下标不是固定的,如果CAS失败,会重新获取新的下标去更新,从而极大地减少了CAS失败的概率。

       

    4.使用样例

    1. public class LongAdderTestDemo {
    2. public static void main(String[] args) throws InterruptedException {
    3. LongAdder adder = new LongAdder();
    4. int [] num = new int[1];
    5. Thread[] threads = new Thread[10];
    6. for (int i = 0; i < 10; i++) {
    7. threads[i] = new Thread(() -> {
    8. for (int j = 0; j < 10000; j++) {
    9. adder.add(1);
    10. num[0] += 1;
    11. }
    12. });
    13. threads[i].start();
    14. }
    15. for (int i = 0; i < 10; i++) {
    16. threads[i].join();
    17. }
    18. // 通过对比发现,使用LongAdder能保证最后的值是期望的值,不存在并发写错误
    19. System.out.println("adder:" + adder);
    20. System.out.println("num:" + num[0]);
    21. }
    22. }

    5.源码学习

    5.1 准备工作

            LongAdder类继承Striped64类,本身只提供了有一个无参构造方法:

            对外暴露increment、decrement、sum等方法,其中increment和decrement内部调用的也是add方法(后面会详细分析add方法源码,先了解个轮廓)。

            再看父类Striped64都包含什么成员变量(代码太长,不适合截图,直接按需贴代码段):

    1. abstract class Striped64 extends Number {
    2. @jdk.internal.vm.annotation.Contended static final class Cell {
    3. // 代码省略...
    4. }
    5. /**
    6. * 当然CPU的数量,在Cell数组扩容时会被使用到
    7. */
    8. static final int NCPU = Runtime.getRuntime().availableProcessors();
    9. /**
    10. * cell数组,长度是2的幂次方
    11. */
    12. transient volatile Cell[] cells;
    13. /**
    14. * Base value, used mainly when there is no contention, but also as
    15. * a fallback during table initialization races. Updated via CAS.
    16. 通常如下两种场景使用:
    17. * 1.通常在无并发时,线程直接CAS更新该值
    18. 2.初始化数组时,必须要保证数组只被初始化一次,其他竞争失败的线程会将数值累加到base上
    19. */
    20. transient volatile long base;
    21. /**
    22. * cell数组对应的锁,初始化cells或者扩容cells需要获取锁。
    23. * 0表示当前cell数组没有线程使用,1表示当前数组已经有线程占用
    24. */
    25. transient volatile int cellsBusy;
    26. // VarHandle 是JDK9之后提供的用于替代之前Unsafe类的,可直接操作内存中的变量
    27. private static final VarHandle BASE;
    28. private static final VarHandle CELLSBUSY;
    29. private static final VarHandle THREAD_PROBE;
    30. // 其他方法省略...
    31. }

    对VarHandler的一点说明:

    Unsafe 是不建议开发者直接使用的,因为 Unsafe 所操作的并不属于Java标准,会容易带来一些安全性的问题。JDK9 之后,官方推荐使用 java.lang.invoke.Varhandle 来替代 Unsafe 大部分功能,对比 Unsafe ,VarHandle 有着相似的功能,但会更加安全,并且,在并发方面也提高了不少性能。

            上面对LongAdder和Striped64的主要成员变量大概有个表面的认识,下面开始对关键的代码进行具体的分析。

    5.2 Striped64 关键代码分析

    5.2.1 static代码块

     代码如下截图:

             静态代码块主要是通过MethodHandles和VarHandle来获取Striped64类的变量base、cellsBusy和Thread类的threadLocalRandomProbe变量。

    5.2.2 Cell类

     代码如下截图:

            Cell是一个静态内部类,且被@sun.misc.Contended修饰(避免伪共享问题),内部包含一个被volatile修饰的value变量,内部静态代码块同样使用MethodHandles实现VALUE变量的初始化。此外提供了一个cas接口,底层通过VarHandler的compareAndSet方法实现value值的CAS更新。

    5.2.3 longAccumulate方法(核心方法)

      看代码前先看下入参:    

    • long x : 需要增加的值,一般默认都是1

    • LongBinaryOperator fn : 默认传递的是null

    • wasUncontended:竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false

    1. final void longAccumulate(long x, LongBinaryOperator fn,
    2. boolean wasUncontended) {
    3. int h;
    4. // 通过getProbe方法获取为线程生成一个非0的hash值,底层是通过VarHandle实现
    5. if ((h = getProbe()) == 0) {
    6. /*
    7. 如果当前线程的hash值为0,0同数组长度取余后依旧是0,会固定到数组第一个位置。所以
    8. 这里通过ThreadLocalRandom产生一个随机数,重新计算一个hash值,但认为此次不算是
    9. 一次竞争,所以将wasUncontended设置为true.
    10. */
    11. ThreadLocalRandom.current(); // force initialization
    12. h = getProbe();
    13. wasUncontended = true;
    14. }
    15. // 扩容意向,当为false表示不扩容
    16. boolean collide = false; // True if last slot nonempty
    17. done: for (;;) {
    18. Cell[] cs; Cell c; int n; long v;
    19. // 情况一:cells数组不为空,且长度大于0
    20. if ((cs = cells) != null && (n = cs.length) > 0) {
    21. if ((c = cs[(n - 1) & h]) == null) {
    22. if (cellsBusy == 0) { // Try to attach new Cell
    23. Cell r = new Cell(x); // Optimistically create
    24. if (cellsBusy == 0 && casCellsBusy()) {
    25. try { // Recheck under lock
    26. Cell[] rs; int m, j;
    27. if ((rs = cells) != null &&
    28. (m = rs.length) > 0 &&
    29. rs[j = (m - 1) & h] == null) {
    30. rs[j] = r;
    31. break done;
    32. }
    33. } finally {
    34. cellsBusy = 0;
    35. }
    36. continue; // Slot is now non-empty
    37. }
    38. }
    39. collide = false;
    40. }
    41. else if (!wasUncontended) // CAS already known to fail
    42. wasUncontended = true; // Continue after rehash
    43. else if (c.cas(v = c.value,
    44. (fn == null) ? v + x : fn.applyAsLong(v, x)))
    45. break;
    46. else if (n >= NCPU || cells != cs)
    47. collide = false; // At max size or stale
    48. else if (!collide)
    49. collide = true;
    50. else if (cellsBusy == 0 && casCellsBusy()) {
    51. try {
    52. if (cells == cs) // Expand table unless stale
    53. cells = Arrays.copyOf(cs, n << 1);
    54. } finally {
    55. cellsBusy = 0;
    56. }
    57. collide = false;
    58. continue; // Retry with expanded table
    59. }
    60. h = advanceProbe(h);
    61. }
    62. // 情况二:cells数组还未初始化,最后调用casCellsBusy()通过CAS操作去获取锁
    63. else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
    64. try { // Initialize table
    65. if (cells == cs) {
    66. Cell[] rs = new Cell[2];
    67. // 建一个新的Cell元素,value是x值,默认为1
    68. rs[h & 1] = new Cell(x);
    69. cells = rs;
    70. break done;
    71. }
    72. } finally {
    73. cellsBusy = 0;
    74. }
    75. }
    76. // 情况三:有别的线程正在初始化数组,则尝试累加在base变量上
    77. else if (casBase(v = base,
    78. (fn == null) ? v + x : fn.applyAsLong(v, x)))
    79. break done;
    80. }
    81. }

    5.3  LongAdder关键代码分析 

    5.3.1 add 方法(核心入口方法)

    1. public void add(long x) {
    2. /*
    3. 对下面几个变量的说明:
    4. 1)as表示cells数组的引用
    5. 2)b表示获取的base值
    6. 3)v表示期望值
    7. 4)m表示cells数组的长度
    8. 5)a表示当前线程命中的cell单元格
    9. */
    10. Cell[] as; long b, v; int m; Cell a;
    11. /*
    12. 如果cells数组不为空,或者当前线程竞争base失败.
    13. 由于初始时cells为空,第一次调用add()方法的话,(as = cells) != null不成立,
    14. 所以会执行 !casBase(b = base, b + x)尝试对base值进行CAS修改
    15. */
    16. if ((as = cells) != null || !casBase(b = base, b + x)) {
    17. //
    18. boolean uncontended = true;
    19. if (as == null || (m = as.length - 1) < 0 ||
    20. (a = as[getProbe() & m]) == null ||
    21. !(uncontended = a.cas(v = a.value, v + x)))
    22. longAccumulate(x, null, uncontended);
    23. }
    24. }

    这里着重对内层嵌套的这部分if判断条件进行拆分分析:

     

    这里分为三步:

    • as == null || (m = as.length - 1) < 0
    • (a = as[getProbe() & m]) == null
    • (a = as[getProbe() & m]) == null

    其中如果前一个表达式成立,则不会在执行后一个表达式。

    第一步:as == null || (m = as.length - 1) < 0

            此条件成立说明cells数组还未初始化。如果不成立则说明cells数组已经完成初始化,对应的线程需要找到Cell数组中的元素去写值,接着执行下面步骤2。

    第二步:(a = as[getProbe() & m]) == null

            通过getProbe()获取当前线程的hash值,并同m做与运算(m=cells长度-1),当条件成立时说明当前线程通过hash计算出来数组位置处的cell为空,则去执行longAccumulate()方法。如果不成立则说明对应的cell不为空,则接着执行下面步骤3。

    第三步:!(uncontended = a.cas(v = a.value, v + x)

            通过a.cas(v = a.value, v + x)尝试进行一次CAS更改value操作,如果成功则退出if条件,失败则继续往下执行longAccumulate()方法。

    5.3.2 sum方法

    1. /**
    2. * 返回累加的和,也就是"当前时刻"的计数值
    3. * 注意: 高并发时,此返回值可能不是绝对准确的,因为调用这个方法时,没有对Cell数组进行加锁
    4. 可能会有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值
    5. */
    6. public long sum() {
    7. Cell[] as = cells; Cell a;
    8. long sum = base;
    9. if (as != null) {
    10. for (int i = 0; i < as.length; ++i) {
    11. if ((a = as[i]) != null)
    12. sum += a.value;
    13. }
    14. }
    15. return sum;
    16. }

  • 相关阅读:
    Apache APISIX Dashboard 未经认证访问导致 RCE(CVE-2021-45232)漏洞复现
    CentOS7安装Flink1.17伪分布式
    M4Singer CUDA error: no kernel image is available for execution on the device
    查询硬盘序列号、物理地址及对应批处理命令
    RHCSA 05 - 管理Selinux
    获取个人免费版Ubuntu Pro
    springmvc@RequestBody原理及扩展
    JSX基本使用
    【Hadoop】在spark读取clickhouse中数据
    Abdroid - 开机动画修改
  • 原文地址:https://blog.csdn.net/liulianglin/article/details/126361550