• LongAdder(高性能原子累加器)源码分析


    LongAdder源码分析

    1. AtomicLong分析

    AtomicLong是Java1.5时的一个基于 CAS 的原子类,通过CAS算法提供了非阻塞的原子性操作,但是在 超高并发 下AtomicLong的性能就会非常低下。

    先观察AtomicLong中的一个方法 incrementAndGet() 的实现。

    /**
     * Atomically increments by one the current value.
     * 将当前值原子地加1。
     * @return the updated value
     */
    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    底层实际上是调用了Unsafe类中的方法,AtomicLong中维护了一个Unsafe类的对象:

    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
    
        // setup to use Unsafe.compareAndSwapInt for updates
        // 设置使用Unsafe.compareAndSwapInt进行更新
        private static final Unsafe unsafe = Unsafe.getUnsafe();
       	// 偏移量
        private static final long valueOffset;
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Unsafe类中的getAndAddInt()方法实现:

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
            // 不断的去尝试cas,直到成功为止
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        return var5;
    }
    
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    
    public native int getIntVolatile(Object var1, long var2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以看到AtomicLong的底层是使用do while语句去尝试CAS操作,如果是海量线程一起发生竞争,但是只会有一个线程的cas操作成功,其余的线程都会失败,然后进入循环尝试下一次(自旋状态)。这样会导致CPU资源严重浪费,从而造成低效的结果。

    所以解决问题的关键在于如何使大量线程避免不断自旋尝试。

    2. LongAdder原理分析

    LongAdder是JDK8中新增的API,目的在于解决AtomicLong低效的情况。所以对于一些技术统计的需求建议使用LongAdder进行处理。

    2.1 AtomicLong与LongAdder性能对比

    测试如下代码:

    public class AtomicLongVsLongAdder {
        /**
         * Atomic和LongAdder耗时测试
         */
        public static void main(String[] args) throws Exception {
            testAtomicLongAdder(1, 10000000);
            testAtomicLongAdder(10, 10000000);
            testAtomicLongAdder(100, 10000000);
        }
    
        static void testAtomicLongAdder(int threadCount, int times) throws Exception {
            System.out.println("threadCount: " + threadCount + ", times: " + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms");
            System.out.println("threadCount: " + threadCount + ", times: " + times);
            long atomicStart = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms");
            System.out.println("----------------------------------------");
        }
    
        static void testAtomicLong(int threadCount, int times) throws Exception {
            AtomicLong atomicLong = new AtomicLong();
            List<Thread> list = new ArrayList();
            for (int i = 0; i < threadCount; i++) {
                list.add(new Thread(() -> {
                    for (int j = 0; j < times; j++) {
                        atomicLong.incrementAndGet();
                    }
                }));
            }
    
            for (Thread thread : list) {
                thread.start();
            }
    
            for (Thread thread : list) {
                thread.join();
            }
    
            System.out.println("AtomicLong value is : " + atomicLong.get());
        }
    
        static void testLongAdder(int threadCount, int times) throws Exception {
            LongAdder longAdder = new LongAdder();
            List<Thread> list = new ArrayList();
            for (int i = 0; i < threadCount; i++) {
                list.add(new Thread(() -> {
                    for (int j = 0; j < times; j++) {
                        longAdder.increment();
                    }
                }));
            }
            for (Thread thread : list) {
                thread.start();
            }
            for (Thread thread : list) {
                thread.join();
            }
            System.out.println("LongAdder value is : " + longAdder.longValue());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    控制台打印结果:

    threadCount: 1, times: 10000000
    LongAdder value is : 10000000
    LongAdder 耗时:123ms
    threadCount: 1, times: 10000000
    AtomicLong value is : 10000000
    AtomicLong 耗时:46ms
    ----------------------------------------
    threadCount: 10, times: 10000000
    LongAdder value is : 100000000
    LongAdder 耗时:126ms
    threadCount: 10, times: 10000000
    AtomicLong value is : 100000000
    AtomicLong 耗时:1845ms
    ----------------------------------------
    threadCount: 100, times: 10000000
    LongAdder value is : 1000000000
    LongAdder 耗时:1039ms
    threadCount: 100, times: 10000000
    AtomicLong value is : 1000000000
    AtomicLong 耗时:18074ms
    ----------------------------------------
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    结果分析:

    • 在单线程的情况下AtomicLong要优于LongAdder
    • 在多线程发生竞争的情况下,LongAdder的性能相较于AtomicLong有大幅提升

    2.2 LongAdder性能更优的原因

    原理图:

    在这里插入图片描述

    LongAdder的设计思想是采用 分段 的方式提高CAS操作的成功率。

    AtomicLong中的成员变量value用来保存值,而在高并发的情况下value就变成了一个热点数据,也就是众多线程竞争一个 value

    LongAdder则是将 value 值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个 value 值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。

    同时LongAdder中还有一个base变量,当并发不高的情况下都是通过CAS来直接操作base值,如果CAS失败,则针对LongAdder中的Cell[]数组中的Cell进行 CAS 操作,减少失败的概率。

    3. 源码分析

    3.1 父类 Striped64

    LongAdder继承了Striped64,可以说是核心属性和方法都在Striped64中。

    内部类Cell :

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        // 值的偏移量
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    /** 当前计算机的CPU数量,用于控制cells数组长度的一个关键条件。 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    
    // 当非空时,size是2的幂。
    transient volatile Cell[] cells;
    
    // 基本值,主要在没有竞争时使用,也可以在表初始化竞争期间作为备用。通过CAS更新
    transient volatile long base;
    
    // 自旋锁(通过CAS锁定)在调整大小和/或创建cell时使用。
    transient volatile int cellsBusy;
    
    // 通过CAS的方式更新cell中的数据
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    
    // 通过CAS的方式获取锁(同步状态)
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    
    // 返回当前线程的探测值。
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3.2 add(long x) 方法

    add方法用于添加给定的值,因为一开始没有线程竞争的时候,是使用CAS操作像base变量中写入数据,所以需要判断cell数组是否被创建并且写base操作是否成功。但是如果写base发生竞争或者cell数组不为空,就需要执行核心的longAccumulate()方法。

    /**
     * 添加给定的值
     *
     * @param x 要增加的值
     */
    public void add(long x) {
        /*
          as: Cell数组的引用
          b: base值
          v: 期望值
          m: Cell数组的长度 - 1
          a: 当前线程命中的cell
         */
        Cell[] as; long b, v; int m; Cell a;
        // 条件1 (as = cells) != null:
        //      true->表示cells数组已经被创建
        //      false->表示cells数组没有被创建,要执行casBase方法
        // 条件2 !casBase(b = base, b + x) 在条件1位false的情况下执行,也就是cells数组没有创建的情况
        // casBase方法是将更新的值通过CAS的方式设置到base变量,
        //      true->cas设置失败,说明在cas操作base时发生了竞争,需要执行if语句中的逻辑
        //      false->cas设置成功,不需要执行之后的逻辑
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            // 进入if语句的条件:
            // 1. 当前cells数组已经初始化,当前线程需要将数据写到对应的cell中
            // 2. CAS写base失败,可能需要重试或者初始化cells数组
            // 总的来说就是当前线程写数据操作失败就会进入if语句
    
            //true表示未竞争  false发生竞争 (uncontended:无竞争)
            boolean uncontended = true;
    
            // 以下if是判断是否执行longAccumulate()
            // 条件1 as == null || (m = as.length - 1) < 0
            //      true->表示cells数组没有被创建
            //      false->cells数组已经被创建,无需执行longAccumulate()方法
            // 条件2 (a = as[getProbe() & m]) == null
            // 执行条件2时说明cells数组已经被创建并且 m 已经被赋值为cells数组长度-1
            // getProbe()方法实际上直接调用了Unsafe类中的getInt()方法
            // getInt()方法是一个本地方法,用来获取一个整型数值,
            // 同时与m(cells.length - 1)做与运算,类似于HashMap的寻址方式
            // 也就是找到当前线程所对应cells数组中的位置
            //      true->cells数组中对应位置的cell对象没有创建,需要执行longAccumulate()方法
            //      false->cells数组中对应位置的cell对象已经创建,需要进一步判断是否发生竞争
            // 条件3 !(uncontended = a.cas(v = a.value, v + x))
            // 条件3能执行说明已经执行过条件2中的逻辑,a被赋值为数组中对应位置的cell对象并且不为null
            // 这时要对cell对象进行cas操作
            //      true->说明CAS失败,对cell的写操作发生竞争
            //      false->CAS成功,无需执行if语句中的逻辑
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                // 那些情况会调用longAccumulate()?
                // 1. cell数组没有初始化, 也就是多线程写base发生竞争
                // 2. cells数组已经初始化,但是当前线程对应的cell为空
                // 3. cells数组已经初始化,并且当前线程对应的cell不为空,但是对这个cell进行CAS写时失败了(可能需要扩容)
                longAccumulate(x, null, uncontended);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    3.3 longAccumulate()方法

    先来总结一下longAccumulate()会执行的情况:

    • Cell数组未初始化
    • Cell数组已经初始化了,但是当前线程对应数组中的Cell对象为null
    • Cell数组初始化了,且当前线程对应的Cell不为NULL,但是发现有多个线程去写Cell,即发生了竞争。
    /**
     * Cell数组未初始化
     * Cell数组已经初始化了,但是当前线程对应数组中的Cell对象为null
     * Cell数组初始化了,且当前线程对应的Cell不为NULL,但是发现有多个线程去写Cell,即发生了竞争。
     * @param x 要加的值
     * @param fn 函数式接口LongBinaryOperator的实现,在add方法中传入的是null
     * @param wasUncontended 只有第三种情况即发生了竞争wasUncontended才会变为false。
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    
        // 表示当前线程的hash值
        int h;
        // 当h==0时,说明还没有分配哈希值
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe(); // 为h赋值
    
            // wasUncontended直译过来: 是否无竞争
            //      true->发生竞争
            //      false->没有发生竞争
            // 注意,进入这个if语句并执行赋值操作说明开始h的值是0,也就是没有分配hash值
            // 0 与任何数做与运算都是0,也就是说当前线程会被分配到cells数组下标为0的位置上
            // 并且会进入if语句说明发生了竞争,所以wasUncontended赋值为true
            wasUncontended = true;
        }
    
        // collide表示扩容意向,false一定不会扩容,true可能会扩容。
        boolean collide = false;                // True if last slot nonempty
    
        // 自旋开始
        for (;;) {
            // as 表示cells数组
            // a 线程对应cells数组上的cell对象
            // n cells数组的长度
            // v 期望值
            Cell[] as; Cell a; int n; long v;
    
            // CASE1: 进入条件->cells已经创建
            if ((as = cells) != null && (n = as.length) > 0) {
                // CASE1.1: cells已经初始化但是线程对应索引的cell未初始化
                if ((a = as[(n - 1) & h]) == null) {
                    // true(cellsBusy = 0)表示当前锁未被占用, false表示被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // 创建新的Cell对象,初值就是x
                        Cell r = new Cell(x);   // Optimistically create
                        //  锁标志为0,并且cas设置锁标记变量为1才能进入if语句中
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // 再次核对锁
                                // rs cells数组引用
                                // m cells数组长度
                                // j 命中的cells数组的下标
                                Cell[] rs; int m, j;
                                // 前两个条件恒成立
                                // 第三个条件判断是为了防止多线程下cell覆盖的问题
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r; //将创建好的r赋值给当前位置
                                    created = true; //修改标志位 表示已经创建
                                }
                            } finally {
                                // 释放锁
                                cellsBusy = 0;
                            }
                            if (created)
                                // 如果cell数组已经创建了,直接退出。
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // CASE1.2:
                // 只有一种情况wasUncontended为false,即当前cell不为null,
                // 并且发生了竞争, 后续重置hash值
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // CASE1.3:
                // 当前线程重新计算过hash值,并且对应cell对象不为空,
                // 就通过cas的方式设置cell中的数据,成功则终止循环,失败则再次自旋
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // CASE1.4: n(cells数组长度)大于NCPU,或者已经被其他线程扩容过了,
                else if (n >= NCPU || cells != as)
                    // 将扩容意向设置为false
                    collide = false;            // At max size or stale
                // CASE1.5:
                // 设置扩容意向为true。但是不一定发生扩容
                else if (!collide)
                    collide = true;
                // CASE1.6:
                // 真正扩容的逻辑
                //      条件1: cellsBusy == 0 表示无锁状态,当前线程可以去竞争锁
                //      条件2: casCellsBusy()CAS获取锁的逻辑,获取锁成功可以执行扩容逻辑,
                //      false表示当前时刻有其他线程在做扩容相关的操作。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        // 再次判断 cells == as
                        // 防止多线程下重复扩容
                        if (cells == as) {      // Expand table unless stale
                            // 扩容为原来的2倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        // 释放锁
                        cellsBusy = 0;
                    }
                    // 扩容意向改为false
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 重新计算当前线程对应的hash值
                h = advanceProbe(h);
            }
            // CASE2: 当cellsBusy为0也就是cells数组未加锁的情况
            // 进入CASE2的前置条件 : cells数组还未初始化 (as = cells) == null
            // 三个判断条件:
            //      1. cellsBusy == 0 -> 表示当前未加锁
            //      2. cells == as -> 在单线程的情况下没有必要判断,但是在多线程情况下
            //      如果有其它线程将cells初始化过了,那么此时as == cells结果就位false了
            //      3.casCellsBusy() -> 将锁标志变量cellsBusy通过cas方式是设置为1
            //          true->设置成功,进入分支;false->设置失败,判断下一分支
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // CASE3:
            // 会执行CASE3的条件:
            //      1. 当前cellsBusy加锁状态,表示其他线程正在初始化cells,所以当前线程将值累加到base中
            //      2. cells被其他线程初始化后,当前线程需要将数据累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152

    3.4 sum()方法

    public long sum() {
        Cell[] as = cells; Cell a;
        // 先获取base值
        long sum = base;
        // 如果cell数组不为null,就累加cell数组中的值
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    4. 总结

    LongAdder的核心思想就是通过空间换时间,将热点value分散成一个Cell数组来承接并发的CAS,从而提升性能。

    但是AtomicLong可以彻底被取代了吗?

    虽然看上去LongAdder性能远超了AtomicLong,但是也要分场景使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。

    同时,sum()方法后可以知道 LongAdder 在统计的时候如果有并发更新,可能导致统计的数据有误差,LongAdder保证最终一致性,但不保证强一致性,所以在对统计数据的一致性要求严格的情况建议使用AtomicLong。而在高并发计数的场景下更适合使用LongAdder。

    参考文章:

    面试官问我LongAdder,我惊了…

  • 相关阅读:
    Asp .Net Core 系列:基于 Castle DynamicProxy + Autofac 实践 AOP 以及实现事务、用户填充功能
    Servlet
    AI图书推荐:用ChatGPT按需DIY定制来赚钱
    图的结构模板及遍历
    [go 面试] 构建高效微服务通信:选择合适的通信方式
    Bert基础(二)--多头注意力
    【面试八股总结】C++内存管理:内存分区、内存泄漏、new和delete、malloc和free
    方案:数智化视频AI技术为智慧防汛筑基,构建防汛“数字堤坝”
    MATLB|基于复杂网络的配电系统微电网优化配置
    Nginx可以通过配置文件实现三种常见的负载均衡方式
  • 原文地址:https://blog.csdn.net/qq_51628741/article/details/127432272