主要内容
- 从网站计数器实现中一步步引出 CAS 操作
- 介绍 Java 中的 CAS 及 CAS 可能存在的问题
内存位置(V)
、期望值(A)
、更新值(B)
,也即是说内存位置的值和期望值是一致的,就是将值更新为更新值。锁
的范畴,我们称它为“无锁”。但是因为自旋(死循环)的存在,会在循环内部进行处理。但是因为死循环,可能会导致 CPU 上升。需求
+1
,如何实现?100
个人同时访问,并且每个人对咱们的网站发起10
次请求,最后总访问次数应该是1000
次。实现
我们现在用有问题的方式来处理
public class RequestDemo {
// 总人数
private static final int peopleCount = 100;
// 每人访问次数
private static final int requestTime = 10;
// 总计访问次数
private static int count = 0;
public static void main(String[] args) throws Exception {
CountDownLatch downLatch = new CountDownLatch(peopleCount); // 100个门栓
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
for (int j = 0; j < requestTime; j++) {
request();
}
} finally {
downLatch.countDown(); // 执行完1个人 门栓个数-1
}
}).start();
}
downLatch.await(); // 等待所有线程都执行完 主线程才执行
System.out.println("耗时:" + (endTime - startTime) + ", count = " + count);
}
/**
* 请求
*/
private static void request() {
// 模拟网络延迟
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
}
某一次的输出结果
耗时:67, count = 987
为什么不是1000呢?
/**
* Q:分析一下问题出在哪呢?
* A:count ++ 操作实际上是由3步来完成!(jvm执行引擎)
* 1.获取count的值,记做A : A=count
* 2.将A值+1,得到B :B=A+1
* 3.将B值赋值给count
*
* 如果有A.B两个线程同时执行count++,他们通知执行到上面步骤的第一步,得到的
* count是一样的,3步操作结束后,count只加1,导致count结果不正确!
* Q:怎么解决结果不正确问题?
* A:对count++操作的时候,我们让多个线程排队处理,多个线程同时到达request()方法的时候,
* 只能允许一个线程可以进去操作,其它的线程在外面等着,等里面的处理完毕出来之后,外面等着的
* 再进去一个,这样操作的count++就是排队进行的,结果一定是正确的。
*
* Q:怎么实现排队效果??
* A:java中synchronized关键字和ReentrantLock都可以实现对资源枷锁,保证并发正确性,
* 多线程的情况下可以保证被锁住的资源被“串行”访问。
*/
count++;
那么我们将代码进行稍加处理一下
/**
* 请求
*/
// 在方法体上加上 synchronized,锁住类对象(因为是静态方法)
private synchronized static void request() {
// 模拟网络延迟
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
输出
耗时:5596, count = 1000
上面的代码线程安全,使用 synchronized
关键字对方法进行加锁,但是性能很差,非常耗时。
分析:
/**
* Q:耗时太长的原因是什么呢?
* A:程序中的request方法使用synchronized关键字修饰,保证了并发情况下,request方法同一时刻
* 只允许一个线程进入,request加锁相当于串行执行了,count的结果和我们预期的一致,只是耗时太长了..
*
* Q:如何解决耗时长的问题?
* A:count ++ 操作实际上是由3步来完成!(jvm执行引擎)
* 1.获取count的值,记做A : A=count
* 2.将A值+1,得到B :B=A+1
* 3.将B值赋值给count
* 升级第3步的实现:
* 1.获取锁
* 2.获取一下count最新的值,记做LV
* 3.判断LV是否等于A,如果相等,则将B的值赋值给count,并返回true,否则返回false
* 4.释放锁
*/
private synchronized static void request() {
}
再次进行优化
在使用 CAS 之前,我们可以将锁的范围缩小:
/**
* 请求
*/
private static void request() {
// 模拟网络延迟
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 只需锁住需要原子性的代码块
synchronized (RequestDemo3.class) {
count++;
}
}
输出
耗时:67, count = 1000
使用加锁代码块处理,即可大幅度提升性能。
再次优化
使用 CAS
/**
* 请求
*/
private static void request() {
// 模拟网络延迟
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
int expectCount; // 期望值
// 不断自旋尝试 CAS
while (!compareAndSwap(expectCount = getCount(), expectCount + 1)) {}
}
/**
* synchronized 版本CAS
* @param expectCount 期望的Count
* @param newCount 新的Count
* @return 比较的结果 成功 true | 失败 false
*/
public static synchronized boolean compareAndSwap(int expectCount, int newCount) {
// 判断当前值是否和期望值一致
if (getCount() == expectCount) {
count = newCount; // 更新值
return true;
}
return false;
}
private static int getCount() {
return count;
}
输出
耗时:65, count = 1000
经过 4 次的优化,其实我们已经达到了目的,更快,且更准确。上面最后一个案例,你可能会有疑问为什么 count
不使用 volatile
修饰,因为 compareAndSwap
已经加锁了,这只是一个 Demo。我是想引出 CAS 这个内容。
UnSafe
类就提供了 CAS 方法,如下public final class Unsafe {
// 参数1 表示要操作的对象
// 参数2 表示要操作对象中属性地址的偏移量
// 参数3 预期值
// 参数4 需要更新的值
public final native boolean compareAndSwapObject(Object o, long offset,
Object expected,
Object x);
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
public final native boolean compareAndSwapLong(Object o, long offset,
long expected,
long x);
java.util.concurrent.atomic
包中,包中的实现都是通过 CAS 来实现的。使用 JDK CAS 完成统计
现在我们使用 JDK 的 AtomicInteger 完成上述的案例
public class RequestDemo5 {
// 总人数
private static final int peopleCount = 100;
// 每人访问次数
private static final int requestTime = 10;
// 总计访问次数
private static AtomicInteger count = new AtomicInteger(0); // 原子整数类
public static void main(String[] args) throws Exception {
CountDownLatch downLatch = new CountDownLatch(peopleCount);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
for (int j = 0; j < requestTime; j++) {
request();
}
} finally {
downLatch.countDown();
}
}).start();
}
downLatch.await();
System.out.println(count.get()); // 1000
}
/**
* 请求
*/
private static void request() {
// 模拟网络延迟
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.incrementAndGet(); // 原子的自增操作 方法线程安全 底层使用了CAS
}
}
cmpxchg
",这是一个原子指令,实现并比较替换的操作。cmpxchg
如何保证多核心下的线程安全:系统底层进行 CAS 操作的时候,会判断当前操作系统是否是多核心,如果是,就给“CAS 会存在 ABA 问题,也就是 CAS 在操作的时候会检查当前的值和期望的值是否是一样的,如果没有变化则更新,但是如果一个值原来是 A,在 CAS 方法执行之前,被其他线程修改为了 B,然后又修改成了 A,那么这个时候看起来没有发生变化,CAS 也是可以执行成功的,但是实际上这个值已经做了改变。
如何解决 ABA 问题?
AtomicStampedReference
和 AtomicMarkableReference
前者关注版本,后者只关注是否发生改变。public class AtomicStampedReference<V> {
// 从这个类就可以体现出 只有数据引用 和 版本号都一致 才可以进行 CAS 操作
private static class Pair<T> {
final T reference; // 数据引用
final int stamp; // 版本戳(号)
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair; // 使用 volatile 修饰 确保可见性及有序性
// expectedReference 期望的引用
// newReference 新值引用
// expectedStamp 期望的版本号
// newStamp 新值版本号
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference && // 期望引用与当前引用一致
expectedStamp == current.stamp && // 期望版本与当前版本一致
((newReference == current.reference &&
newStamp == current.stamp) || // 如果新值引用和新值版本号都与当前一样 则说明就是当前引用对象 不需要新建Pair
casPair(current, Pair.of(newReference, newStamp))); // 新建一个Pair类 进行CAS更新操作
}
// cas Pair
// CAS 更新
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
}
现在使用 AtomicInteger
来演示 ABA 问题
public class CasABADemo {
// AtomicInteger原子整数类中的所有方法都是原子的
public static AtomicInteger a = new AtomicInteger(1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread().getName() + ", 初始值:" + a.get());
try {
int expectNum = a.get(); // 拿到的期望值
int newNum = expectNum + 1; // 需要更新的值
Thread.sleep(1000); // 主线程休眠一秒钟,让出cpu
boolean isCASSccuess = a.compareAndSet(expectNum, newNum); // CAS
System.out.println("操作线程" + Thread.currentThread().getName() + ",CAS操作:" + isCASSccuess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread other = new Thread(() -> {
try {
Thread.sleep(20); // 确保Thread-main线程优先执行
a.incrementAndGet(); // a + 1, a=2
System.out.println("操作线程" + Thread.currentThread().getName() + ",【increment】,值=" +a.get());
a.decrementAndGet(); // a - 1, a=1
System.out.println("操作线程" + Thread.currentThread().getName() + ",【decrement】,值=" +a.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
other.start();
}
}
输出
操作线程主线程, 初始值:1
操作线程干扰线程,【increment】,值=2
操作线程干扰线程,【decrement】,值=1
操作线程主线程,CAS操作:true
实际上我们会发现,上述代码并没有发现这个值被改变,现在添加版本号进行处理,如下
public class CasABADemo02 {
public static AtomicStampedReference<Integer> a = new AtomicStampedReference(1, 1); // 初始值为1 版本戳起始设置为1
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread().getName() + ", 初始值:" + a.getReference());
try {
Integer expectReference = a.getReference(); // 拿到的期望值
Integer newReference = expectReference + 1; // 需要更新的值
Integer expectStamp = a.getStamp(); // 获取当前戳
Integer newStamp = expectStamp + 1; // 版本号+1
Thread.sleep(1000); // 主线程休眠一秒钟,让出cpu
boolean isCASSccuess = a.compareAndSet(expectReference, newReference, expectStamp, newStamp);
System.out.println("操作线程" + Thread.currentThread().getName() + ",CAS操作:" + isCASSccuess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread other = new Thread(() -> {
try {
Thread.sleep(20); // 确保Thread-main线程优先执行
a.compareAndSet(a.getReference(), (a.getReference() + 1), a.getStamp(), (a.getStamp() + 1));
System.out.println("操作线程" + Thread.currentThread().getName() + ",【increment】,值=" +a.getReference());
a.compareAndSet(a.getReference(), (a.getReference() - 1), a.getStamp(), (a.getStamp() + 1));
System.out.println("操作线程" + Thread.currentThread().getName() + ",【decrement】,值=" +a.getReference());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
other.start();
}
}
输出
操作线程主线程, 初始值:1
操作线程干扰线程,【increment】,值=2
操作线程干扰线程,【decrement】,值=1
操作线程主线程,CAS操作:false
因为加了版本号,所以上述 t1 的处理是失败的,版本号发生了作用。
现在看一下另外一个类,AtomicMarkableReference
第二个参数为 boolean 类型,只关注是否被修改,而不关注结果。
实现图片功能:
垃圾袋类
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
AtomicMarkableReference
测试类
@Slf4j
public class TestABAAtomicMarkableReference {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,true表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); // 设置初始状态为true
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
// 没有new对象 对象引用地址没有改变 bag -> bag 将状态从 true -> false
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}, "保洁阿姨").start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
// expectedMark:true 说明垃圾袋是满的 需要更换新垃圾袋,将newMark更新为false
// 如果expectedMark已经被人修改过 为false 则本次CAS失败
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
输出
17:14:02.583 c.Test38 [main] - 主线程 start...
17:14:02.585 c.Test38 [main] - cn.itcast.test.GarbageBag@6aceb1a5 装满了垃圾
17:14:02.616 c.Test38 [保洁阿姨] - 打扫卫生的线程 start...
17:14:02.616 c.Test38 [保洁阿姨] - cn.itcast.test.GarbageBag@6aceb1a5 空垃圾袋
17:14:03.628 c.Test38 [main] - 主线程想换一只新垃圾袋?
17:14:03.628 c.Test38 [main] - 换了么?false
17:14:03.628 c.Test38 [main] - cn.itcast.test.GarbageBag@6aceb1a5 空垃圾袋
可以注释掉打扫卫生线程代码,再观察输出
17:14:28.695 c.Test38 [main] - 主线程 start...
17:14:28.698 c.Test38 [main] - cn.itcast.test.GarbageBag@6aceb1a5 装满了垃圾
17:14:29.708 c.Test38 [main] - 主线程想换一只新垃圾袋?
17:14:29.708 c.Test38 [main] - 换了么?true
17:14:29.708 c.Test38 [main] - cn.itcast.test.GarbageBag@2d6d8735 空垃圾袋
UnSafe
类参考