• 基础 | 并发编程 - [LongAdder & Striped64]


    §1 重要属性

    /** Number of CPUS, to place bound on table size */
    // 运行环境的实际 CPU 核数
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    
    /**
     * Table of cells. When non-null, size is a power of 2.
     */
     //单元格数组,非空状态时,长度是 2 的幂
    transient volatile Cell[] cells;
    
    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    // 基础值,通过 cas 更新
    // 主要在没有线程竞争的场景下使用
    // 也用于 cells 初始化时,作为降级使用
    transient volatile long base;
    
    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     */
    // 扩容、创建单元格时的自旋锁
    transient volatile int cellsBusy;
    
    // Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of packaging restrictions.
    // 
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    内部类 cell

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    §2 Striped64 原理粗讲

    • 原子类在 cas 过程中,因为自旋导致浪费了大量的 CPU 性能
      这是因为多线程时,靶值(被修改的值)只有一个
      因此无论几条线程自旋,最终都只能有一条线程修改一次靶值
    • Striped64 内部维护了一个 Cell[],这相当于把靶值拆碎,即这个数组的和才是最终值
      这意味着现在靶值同时存在数组长度个
      相当于其他线程的每次自旋,都会有数组长度个线程完成修改靶值的动作

    §2 LongAdder 源码

    add()
    用于实现 value + x
    下面是源码

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    下面是整体思路和部分细节

    • 当线程压力不大时,cells 里没有值,此时的自增是 对 base 进行 CAS 操作
      if ((as = cells) != null || !casBase(b = base, b + x))
      如果操作成功,就不需要进入内层逻辑

    • 当线程开始争抢对 base 的修改权时,对 base 的 CAS 操作会失败
      这就进入了上面的 if 内部

    • LongAdder 开始使用 cells 后,base 就约等于弃用了
      说约等于是因为最后求和时还要计算 base 的值,同时作为初始化和扩容时抢占 cellBusy 失败的降级方案

    • cells 数组相当于是线程槽,线程会通过计算对对应的槽位进行操作
      线程通过 getProbe() 方法获取线程探针 hash 值,然后将此值与 (数组长度 - 1) 进行按位与运算 as[getProbe() & m]
      达到线程 hash 对cells 的 size 取余的效果,并用结果作为此线程在 cells 数组上的落点,道理类似于 hashMap.get()

    • 当线程需要对 cells 进行操作的时候,会涉及到是否争抢问题

      • 不争抢,说明目前 cells 的 size 足够
      • 争抢,说明需要对 cells 进行扩容
      • 无争抢标记 boolean uncontended = true; ,刚刚声明出来时认为 无争抢
    • 不同的线程会对 对应槽位的 Cell 做 CAS 操作
      通过 a.cas(v = a.value, v + x) 实现对 cells 上某个 Cell+ x
      如果操作成功,认为无争抢,否则认为是争抢,此时需要 Striped64.longAccumulate() 处理
    • 启用 cells 后,下面的情况都需要 Striped64.longAccumulate() 处理
      • 还没有初始化过 cellscells 为 null,或长度 < 1
        as == null(m = as.length - 1) < 0
      • 线程在 cells 上计算的落点上没有数据,即没有初始化对应的 Cell
        a = as[getProbe() & m]) == null
        线程的 hash 按 cells 的 size 取余计算在 cells 上的落点,类似 hashMap.get()
      • 对线程落点上的数据进行 CAS 操作失败

    §2 Striped64 源码

    参数
    Striped64.longAccumulate() 方法的入参如下

    • long x 增加值
    • LongBinaryOperator fn 操作函数,用于实现非 原值 + x 的功能
    • boolean wasUncontended 意味着当前线程 CAS 操作时发生了争抢

    整体脉络
    在这里插入图片描述
    如上图,此方法整体由两个部分组成

    • 主体是一个由 for(;;) 组织的自旋(1-3)
    • 确保方法可以获取线程的 probe (0)
      因自旋中普遍使用这个值作为线程的 hash

    自旋内部使用 if-else 区分了三类场景

    • calls 已经存在且不为空,即 calls 已完成初始化的场景
    • calls 不存在但成功的给 cellsBusy 上自旋锁
      calls 进行初始化
    • 不满足上述场景时的降级处理
      其实就是上面两种情况不满足时,还希望快速完成线程本身带着的 + x 的计算任务
      因此就试试能不能直接加在 base 上,如果能加上,线程就不用自旋了

    保证线程 hash 的获取
    在这里插入图片描述

    calls 已经初始化
    在这里插入图片描述
    如果 cells 数组已经初始化完成,则线程进入此方法可能是需要

    • 初始化 Cell
    • calls 进行扩容
    • 再次尝试对某个 Cell 进行 VAS

    关于 h = advanceProbe(h)
    如果 cells 数组已经初始化完成,则进入此方法的线程每次自旋都需要通过此方法进行 rehash
    这是因为线程的 Probe,即 hash 只是用于计算线程在 calls 上的落点
    并且,只要线程的 + x 计算任务可以执行,那么无论执行在哪个 Cell 上都是无所谓的
    rehash 可以使线程更加均匀的使用 cells
    也可是使线程与另一线程在某 Cell 上发生碰撞而导致自旋的下一轮,避开这个碰撞的槽位


    已初始化 cells 的场景下,各个 case 按判断优先级排列如下

    • 初始化 Cellcalls 初始化了,但可能其中具体槽位还没初始化
      如果是 Cell 没有初始化导致的进入此方法,优先尝试初始化对应的 Cell
      但不保证一定能完成初始化,因为这个初始化任务也需要抢占 cellBusy 标记
      同下面calls未初始化场景一样,初始化 Cell 需要 双重检查

    • 重置刚刚进入此方法的,外面 CAS 某 Cell 失败的线程的争抢标记位 wasUncontended
      如果是因为争抢 Call 失败进入的此方法,在这里将标记清除
      这是因为刚刚就是当前线程对当前落点争抢失败进来的,马上再重试着争抢一次意义不大
      而清除标记后,可以快速跳过次轮自旋并进行一次 rehash,并在下一轮自旋中重新计算新落点并尝试争抢

    • 对已经初始化的 Cell 进行 CAS 操作,以完成 + x 计算任务
      因为在之前清理了争抢标记位 wasUncontended,同时每轮自旋都会 rehash
      所以此时再尝试对某 Cell 操作是有必要的,毕竟很可能换了一个 Cell
      如果 + x 计算任务完成了就跳出,不需要继续自旋了

    • 标记 cells扩容意向禁用扩容 优先于 需要扩容

      • 若对 rehash 的线程再次争抢冲突,理论上是需要扩容的,因此在本轮自旋中标记它为 需要扩容
      • 标记 cells 为 需要扩容 (collide = true) 后,会在下一轮自旋中完成扩容
      • 但若 cells 的长度已经超过 JVM 的可用核数就 禁用扩容
        这是因为无论线程有几条,同时被处理的线程不可能多于 JVM 可用核数,即最多只能对这么多个 Cell 同时操作
        禁用扩容其实不是通过对标记位赋值后再通过什么处理实现的,而是基于 else if 结构,即走了此分支就不会执行扩容逻辑分支了
        需要扩容 与 禁用扩容 也分别在两个分支中,因此一旦禁用扩容,后面的分支都会短路
    • 执行 cells 的 扩容

      • 需要抢占 cellBusy 标记
      • 每次扩容都是 2 倍的关系,Cell[] rs = new Cell[n << 1]
        2 倍的原因同 HashMap 扩容
      • 扩容时,先创建一个 2 倍于原数组长度的新数组,随后按索引复制
        不需要像 HashMap 扩容时一样 rehash,因为只要线程的 + x 任务完成了,在哪个 Cell 上完成都无所谓
      • 同下面calls未初始化场景一样,扩容时需要 双重检查

    calls未初始化
    在这里插入图片描述
    流程如下

    • 只有 CAS 抢占 cellBusy 标记才能开始初始化
    • 定义一个初始化标记 init
      此标记不为 true 就说明没有完成初始化,继续自旋,否则就跳出自旋
    • 双重检查并加锁
      开始初始化前需要进行两次检查 cells == as,能走到此处逻辑意味着 as == null
      第一次检查,检查到 cells == ascalls == null 时对 cellBusy 上锁
      但自旋过程中可能有其他线程同步完成校验,进而一前一后完成 CAS 导致对 cells 进行两次初始化
      这会导致第一次初始化的对应 cell 槽上的值丢失
    • 初始化 cells
      • 创建一个初始长度是 2 的 Cell[]
      • 将对应当前线程落点设置为当前加值
        依然是 as[getProbe() & m]) 的思路,但因为刚初始化长度固定是 2,因此一定是 & 1
        因为是刚初始化,所以直接赋值相当于完成了 + x 操作
      • cells 指向当前新建的 Cell[]
      • 设置初始化标记
    • 还原 cellBusy 标记

    降级使用 base 计算
    在这里插入图片描述

    • Striped64.longAccumulate() 的主要作用是尝试初始化或扩容 cells ,这个过程是自旋的
    • cells 的初始化或扩容过程中的需要对 cellBusy 上锁
    • 若上锁失败,不用直接进行下一轮自旋,可以尝试将当前操作在 base 上完成
      相当于是个降级方案,用于尽可能的提高效率,以免 cells 启用后 base 成为一个摆设
  • 相关阅读:
    用Postman发送xml数据
    华为设备链路聚合基础
    通讯网关软件002——利用CommGate X2HTTP-U实现HTTP访问OPC UA Server
    软件测试工程师面试汇总功能测试篇
    【计算机网络】第一章——概述
    从0开始只要三步手写一个MyBatis
    GraphQL & Go,graphql基本知识,go-graphql使用
    SpringBoot整合Swagger
    【算法合集】学习算法第一天(链表篇)
    分屏bug 小记
  • 原文地址:https://blog.csdn.net/ZEUS00456/article/details/126840632