AtomicLong是Java1.5时的一个基于 CAS 的原子类,通过CAS算法提供了非阻塞的原子性操作,但是在 超高并发 下AtomicLong的性能就会非常低下。
先观察AtomicLong中的一个方法 incrementAndGet() 的实现。
/**
* Atomically increments by one the current value.
* 将当前值原子地加1。
* @return the updated value
*/
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
底层实际上是调用了Unsafe类中的方法,AtomicLong中维护了一个Unsafe类的对象:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
// 设置使用Unsafe.compareAndSwapInt进行更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 偏移量
private static final long valueOffset;
}
Unsafe类中的getAndAddInt()方法实现:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
// 不断的去尝试cas,直到成功为止
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public native int getIntVolatile(Object var1, long var2);
可以看到AtomicLong的底层是使用do while语句去尝试CAS操作,如果是海量线程一起发生竞争,但是只会有一个线程的cas操作成功,其余的线程都会失败,然后进入循环尝试下一次(自旋状态)。这样会导致CPU资源严重浪费,从而造成低效的结果。
所以解决问题的关键在于如何使大量线程避免不断自旋尝试。
LongAdder是JDK8中新增的API,目的在于解决AtomicLong低效的情况。所以对于一些技术统计的需求建议使用LongAdder进行处理。
测试如下代码:
public class AtomicLongVsLongAdder {
/**
* Atomic和LongAdder耗时测试
*/
public static void main(String[] args) throws Exception {
testAtomicLongAdder(1, 10000000);
testAtomicLongAdder(10, 10000000);
testAtomicLongAdder(100, 10000000);
}
static void testAtomicLongAdder(int threadCount, int times) throws Exception {
System.out.println("threadCount: " + threadCount + ", times: " + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("threadCount: " + threadCount + ", times: " + times);
long atomicStart = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms");
System.out.println("----------------------------------------");
}
static void testAtomicLong(int threadCount, int times) throws Exception {
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("AtomicLong value is : " + atomicLong.get());
}
static void testLongAdder(int threadCount, int times) throws Exception {
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < times; j++) {
longAdder.increment();
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("LongAdder value is : " + longAdder.longValue());
}
}
控制台打印结果:
threadCount: 1, times: 10000000
LongAdder value is : 10000000
LongAdder 耗时:123ms
threadCount: 1, times: 10000000
AtomicLong value is : 10000000
AtomicLong 耗时:46ms
----------------------------------------
threadCount: 10, times: 10000000
LongAdder value is : 100000000
LongAdder 耗时:126ms
threadCount: 10, times: 10000000
AtomicLong value is : 100000000
AtomicLong 耗时:1845ms
----------------------------------------
threadCount: 100, times: 10000000
LongAdder value is : 1000000000
LongAdder 耗时:1039ms
threadCount: 100, times: 10000000
AtomicLong value is : 1000000000
AtomicLong 耗时:18074ms
----------------------------------------
Process finished with exit code 0
结果分析:
原理图:

LongAdder的设计思想是采用 分段 的方式提高CAS操作的成功率。
AtomicLong中的成员变量value用来保存值,而在高并发的情况下value就变成了一个热点数据,也就是众多线程竞争一个 value 。
LongAdder则是将 value 值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个 value 值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。
同时LongAdder中还有一个base变量,当并发不高的情况下都是通过CAS来直接操作base值,如果CAS失败,则针对LongAdder中的Cell[]数组中的Cell进行 CAS 操作,减少失败的概率。
LongAdder继承了Striped64,可以说是核心属性和方法都在Striped64中。
内部类Cell :
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 值的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** 当前计算机的CPU数量,用于控制cells数组长度的一个关键条件。 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 当非空时,size是2的幂。
transient volatile Cell[] cells;
// 基本值,主要在没有竞争时使用,也可以在表初始化竞争期间作为备用。通过CAS更新
transient volatile long base;
// 自旋锁(通过CAS锁定)在调整大小和/或创建cell时使用。
transient volatile int cellsBusy;
// 通过CAS的方式更新cell中的数据
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 通过CAS的方式获取锁(同步状态)
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 返回当前线程的探测值。
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
add方法用于添加给定的值,因为一开始没有线程竞争的时候,是使用CAS操作像base变量中写入数据,所以需要判断cell数组是否被创建并且写base操作是否成功。但是如果写base发生竞争或者cell数组不为空,就需要执行核心的longAccumulate()方法。
/**
* 添加给定的值
*
* @param x 要增加的值
*/
public void add(long x) {
/*
as: Cell数组的引用
b: base值
v: 期望值
m: Cell数组的长度 - 1
a: 当前线程命中的cell
*/
Cell[] as; long b, v; int m; Cell a;
// 条件1 (as = cells) != null:
// true->表示cells数组已经被创建
// false->表示cells数组没有被创建,要执行casBase方法
// 条件2 !casBase(b = base, b + x) 在条件1位false的情况下执行,也就是cells数组没有创建的情况
// casBase方法是将更新的值通过CAS的方式设置到base变量,
// true->cas设置失败,说明在cas操作base时发生了竞争,需要执行if语句中的逻辑
// false->cas设置成功,不需要执行之后的逻辑
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 进入if语句的条件:
// 1. 当前cells数组已经初始化,当前线程需要将数据写到对应的cell中
// 2. CAS写base失败,可能需要重试或者初始化cells数组
// 总的来说就是当前线程写数据操作失败就会进入if语句
//true表示未竞争 false发生竞争 (uncontended:无竞争)
boolean uncontended = true;
// 以下if是判断是否执行longAccumulate()
// 条件1 as == null || (m = as.length - 1) < 0
// true->表示cells数组没有被创建
// false->cells数组已经被创建,无需执行longAccumulate()方法
// 条件2 (a = as[getProbe() & m]) == null
// 执行条件2时说明cells数组已经被创建并且 m 已经被赋值为cells数组长度-1
// getProbe()方法实际上直接调用了Unsafe类中的getInt()方法
// getInt()方法是一个本地方法,用来获取一个整型数值,
// 同时与m(cells.length - 1)做与运算,类似于HashMap的寻址方式
// 也就是找到当前线程所对应cells数组中的位置
// true->cells数组中对应位置的cell对象没有创建,需要执行longAccumulate()方法
// false->cells数组中对应位置的cell对象已经创建,需要进一步判断是否发生竞争
// 条件3 !(uncontended = a.cas(v = a.value, v + x))
// 条件3能执行说明已经执行过条件2中的逻辑,a被赋值为数组中对应位置的cell对象并且不为null
// 这时要对cell对象进行cas操作
// true->说明CAS失败,对cell的写操作发生竞争
// false->CAS成功,无需执行if语句中的逻辑
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 那些情况会调用longAccumulate()?
// 1. cell数组没有初始化, 也就是多线程写base发生竞争
// 2. cells数组已经初始化,但是当前线程对应的cell为空
// 3. cells数组已经初始化,并且当前线程对应的cell不为空,但是对这个cell进行CAS写时失败了(可能需要扩容)
longAccumulate(x, null, uncontended);
}
}
先来总结一下longAccumulate()会执行的情况:
/**
* Cell数组未初始化
* Cell数组已经初始化了,但是当前线程对应数组中的Cell对象为null
* Cell数组初始化了,且当前线程对应的Cell不为NULL,但是发现有多个线程去写Cell,即发生了竞争。
* @param x 要加的值
* @param fn 函数式接口LongBinaryOperator的实现,在add方法中传入的是null
* @param wasUncontended 只有第三种情况即发生了竞争wasUncontended才会变为false。
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 表示当前线程的hash值
int h;
// 当h==0时,说明还没有分配哈希值
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe(); // 为h赋值
// wasUncontended直译过来: 是否无竞争
// true->发生竞争
// false->没有发生竞争
// 注意,进入这个if语句并执行赋值操作说明开始h的值是0,也就是没有分配hash值
// 0 与任何数做与运算都是0,也就是说当前线程会被分配到cells数组下标为0的位置上
// 并且会进入if语句说明发生了竞争,所以wasUncontended赋值为true
wasUncontended = true;
}
// collide表示扩容意向,false一定不会扩容,true可能会扩容。
boolean collide = false; // True if last slot nonempty
// 自旋开始
for (;;) {
// as 表示cells数组
// a 线程对应cells数组上的cell对象
// n cells数组的长度
// v 期望值
Cell[] as; Cell a; int n; long v;
// CASE1: 进入条件->cells已经创建
if ((as = cells) != null && (n = as.length) > 0) {
// CASE1.1: cells已经初始化但是线程对应索引的cell未初始化
if ((a = as[(n - 1) & h]) == null) {
// true(cellsBusy = 0)表示当前锁未被占用, false表示被占用
if (cellsBusy == 0) { // Try to attach new Cell
// 创建新的Cell对象,初值就是x
Cell r = new Cell(x); // Optimistically create
// 锁标志为0,并且cas设置锁标记变量为1才能进入if语句中
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // 再次核对锁
// rs cells数组引用
// m cells数组长度
// j 命中的cells数组的下标
Cell[] rs; int m, j;
// 前两个条件恒成立
// 第三个条件判断是为了防止多线程下cell覆盖的问题
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将创建好的r赋值给当前位置
created = true; //修改标志位 表示已经创建
}
} finally {
// 释放锁
cellsBusy = 0;
}
if (created)
// 如果cell数组已经创建了,直接退出。
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE1.2:
// 只有一种情况wasUncontended为false,即当前cell不为null,
// 并且发生了竞争, 后续重置hash值
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// CASE1.3:
// 当前线程重新计算过hash值,并且对应cell对象不为空,
// 就通过cas的方式设置cell中的数据,成功则终止循环,失败则再次自旋
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// CASE1.4: n(cells数组长度)大于NCPU,或者已经被其他线程扩容过了,
else if (n >= NCPU || cells != as)
// 将扩容意向设置为false
collide = false; // At max size or stale
// CASE1.5:
// 设置扩容意向为true。但是不一定发生扩容
else if (!collide)
collide = true;
// CASE1.6:
// 真正扩容的逻辑
// 条件1: cellsBusy == 0 表示无锁状态,当前线程可以去竞争锁
// 条件2: casCellsBusy()CAS获取锁的逻辑,获取锁成功可以执行扩容逻辑,
// false表示当前时刻有其他线程在做扩容相关的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 再次判断 cells == as
// 防止多线程下重复扩容
if (cells == as) { // Expand table unless stale
// 扩容为原来的2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 扩容意向改为false
collide = false;
continue; // Retry with expanded table
}
// 重新计算当前线程对应的hash值
h = advanceProbe(h);
}
// CASE2: 当cellsBusy为0也就是cells数组未加锁的情况
// 进入CASE2的前置条件 : cells数组还未初始化 (as = cells) == null
// 三个判断条件:
// 1. cellsBusy == 0 -> 表示当前未加锁
// 2. cells == as -> 在单线程的情况下没有必要判断,但是在多线程情况下
// 如果有其它线程将cells初始化过了,那么此时as == cells结果就位false了
// 3.casCellsBusy() -> 将锁标志变量cellsBusy通过cas方式是设置为1
// true->设置成功,进入分支;false->设置失败,判断下一分支
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// CASE3:
// 会执行CASE3的条件:
// 1. 当前cellsBusy加锁状态,表示其他线程正在初始化cells,所以当前线程将值累加到base中
// 2. cells被其他线程初始化后,当前线程需要将数据累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
public long sum() {
Cell[] as = cells; Cell a;
// 先获取base值
long sum = base;
// 如果cell数组不为null,就累加cell数组中的值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
LongAdder的核心思想就是通过空间换时间,将热点value分散成一个Cell数组来承接并发的CAS,从而提升性能。
但是AtomicLong可以彻底被取代了吗?
虽然看上去LongAdder性能远超了AtomicLong,但是也要分场景使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。
同时,sum()方法后可以知道 LongAdder 在统计的时候如果有并发更新,可能导致统计的数据有误差,LongAdder保证最终一致性,但不保证强一致性,所以在对统计数据的一致性要求严格的情况建议使用AtomicLong。而在高并发计数的场景下更适合使用LongAdder。
参考文章: