原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不能被打乱,也不可以被切割而只执行其中的一部分(不可中断性)。将整个操作视为一个整体,资源在该次操作中保持一致,这是原子性的核心特征。
CAS属于硬件同步原语,处理器提供了基本内存操作的原子性保证。
CAS操作需要输入两个数值,一个旧值A(期望操作前的值)和一个新值B,在操作期间先对旧值进行比较,若没有发生变化,才交换成新值,发生了变化则不交换。
Java中的sum.misc.Unsafe类,提供了compareAndSwapInt()和compareAndSwapLong()等几个方法实现CAS。
public class CustomAtomicCounter {
private volatile int i = 0;
private static Unsafe unsafe = null;
private static long valueOffset;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
valueOffset = unsafe.objectFieldOffset(CustomAtomicCounter.class.getDeclaredField("i"));
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
public void add() {
int current = 0;
do {
current = unsafe.getIntVolatile(this, valueOffset);
} while (!unsafe.compareAndSwapInt(this, valueOffset, current, current + 1));
}
public int getValue() {
return i;
}
public static void main(String[] args) throws InterruptedException {
CustomAtomicCounter counter = new CustomAtomicCounter();
for (int count = 0; count < 10000; count++) {
new Thread(() -> {
counter.add();
}).start();
}
Thread.sleep(3000L);
System.out.println(counter.getValue());
}
}
备注:可以看到控制台输出为
10000
。
CAS若使用不当,会引发如下三个小问题:
先看下面这幅图:
上图中thread1经过CAS操作之后把i的值从0改为1,最终又改回了1,而thread2在等待thread1执行完操作后,接着又通过CAS操作将值改为了1。
备注:其实
i
的值实际上已经被修改过了,这个时候的i
已经不是之前的了,一句话,已经被用过了。
这里我们以数据结构栈(先进后出)
为示例,展现ABA问题,不安全的栈代码示例如下:
public class Node {
public final String value;
public Node next;
public Node(String value) {
this.value = value;
}
@Override
public String toString() {
return "Node[" + "value='" + value + '\'' + ']';
}
}
public class UnsafeStack {
private AtomicReference<Node> top = new AtomicReference<>();
/**
* 入栈
* @param node
*/
public void push(Node node) {
Node oldTop = null;
do {
oldTop = top.get();
node.next = oldTop;
} while (!top.compareAndSet(oldTop, node)); // CAS操作替换栈顶
}
public Node pop(int timeInMillis) {
Node newTop = null;
Node oldTop = null;
do {
oldTop = top.get();
if (oldTop == null) {
return null;
}
newTop = oldTop.next;
// 模拟延时
if (timeInMillis > 0) {
LockSupport.parkNanos(1000 * 1000 * timeInMillis);
}
} while (!top.compareAndSet(oldTop, newTop)); // CAS操作替换栈顶
return oldTop;
}
public Node peek() {
return top.get();
}
}
这里我们用两个线程模拟入栈和出栈的动作,操作步骤如下:
B入栈
→ A入栈
,栈顶指向A。A出栈
→ B出栈
→ D入栈
→ C入栈
→ A再次入栈
,栈顶仍然指向A。A出栈
。public class AbaProblemExample {
public static void main(String[] args) {
UnsafeStack stack = new UnsafeStack();
stack.push(new Node("B")); // B入栈
stack.push(new Node("A")); // A入栈
Thread thread1 = new Thread(() -> {
// A出栈
Node node = stack.pop(500);
System.out.println(Thread.currentThread().getName() + " " + node);
});
thread1.start();
Thread thread2 = new Thread(() -> {
LockSupport.parkNanos(1000 * 1000 * 300);
// A出栈
Node nodeA = stack.pop(0);
System.out.println(Thread.currentThread().getName() + " " + nodeA);
// B出栈
Node nodeB = stack.pop(0);
System.out.println(Thread.currentThread().getName() + " " + nodeB);
stack.push(new Node("D"));
stack.push(new Node("C"));
// A再次入栈,因此栈顶依然是A
stack.push(nodeA);
});
thread2.start();
// 休眠两秒
LockSupport.parkNanos(1000 * 1000 * 1000 * 2);
System.out.println("开始遍历UnsafeStack:");
Node node = null;
while ((node = stack.pop(0)) != null) {
System.out.println(node);
}
}
}
运行测试用例,控制台输出如下:
Thread-1 Node[value='A']
Thread-1 Node[value='B']
Thread-0 Node[value='A']
开始遍历UnsafeStack:
Node[value='B']
备注:其实栈中的内容已经被thread2修改过了,栈中的内容从栈顶到栈底应该为 C → D,理论上遍历栈的输出应该为:
Node[value='C']
,Node[value='D']
。
其实ABA问题也容易解决,带上版本号就行了。虽然值还是原来的值,被用过了,但是我们可以记录被用过的次数。
基于版本号实现的线程安全的栈如下:
public class ConcurrentStack {
private AtomicStampedReference<Node> top = new AtomicStampedReference<>(null, 0);
/**
* 入栈
* @param node
*/
public void push(Node node) {
Node oldTop = null;
int version = 0;
do {
version = top.getStamp();
oldTop = top.getReference();
node.next = oldTop;
} while (!top.compareAndSet(oldTop, node, version, version + 1)); // CAS操作替换栈顶
}
public Node pop(int timeInMillis) {
Node newTop = null;
Node oldTop = null;
int version = 0;
do {
oldTop = top.getReference();
version = top.getStamp();
if (oldTop == null) {
return null;
}
newTop = oldTop.next;
// 模拟延时
if (timeInMillis > 0) {
LockSupport.parkNanos(1000 * 1000 * timeInMillis);
}
} while (!top.compareAndSet(oldTop, newTop, version, version + 1)); // CAS操作替换栈顶
return oldTop;
}
public Node peek() {
return top.getReference();
}
}
再次运行AbaProblemExample
测试用例,将其中的UnsafeStack
替换为ConcurrentStack
即可,控制台输出如下:
Thread-1 Node[value='A']
Thread-1 Node[value='B']
Thread-0 Node[value='A']
开始遍历UnsafeStack:
Node[value='C']
Node[value='D']
备注:可以看到加上版本号后,再次模拟出入栈,我们的遍历结果是正确的。