CAS(Compare and Swap)比较并交换,顾名思义:比较两个值,如果它们两者相等就把它们交换。
- // 原子地比较原子对象与非原子参数的值,若相等则进行交换,否则进行加载
- bool compare_exchange_strong(T& expected, T val,
- memory_order sync = memory_order_seq_cst) volatile noexcept;
当前原子变量与 expected 相比较,
注意:在进行判等操作时,它执行的是物理上的比较,即直接比较内存值,而不是使用 T 的 == 操作符进行比较。compare_exchange_strong 代码示例:
- #include
- #include
-
- std::atomic<int> ai;
-
- int tst_val= 4;
- int new_val= 5;
- bool exchanged= false;
-
- void valsout()
- {
- std::cout << "ai = " << ai
- << " tst_val = " << tst_val
- << " new_val = " << new_val
- << " exchanged = " << std::boolalpha << exchanged
- << "\\n";
- }
-
- int main()
- {
- ai= 3;
- valsout();
-
- // tst_val != ai ==> tst_val 被修改
- exchanged= ai.compare_exchange_strong( tst_val, new_val );
- valsout();
-
- // tst_val == ai ==> ai 被修改
- exchanged= ai.compare_exchange_strong( tst_val, new_val );
- valsout();
- }
-
- 输出:
- ai = 3 tst_val = 4 new_val = 5 exchanged = false
- ai = 3 tst_val = 3 new_val = 5 exchanged = false
- ai = 5 tst_val = 3 new_val = 5 exchanged = true
可以用在代码层面上实现 fetch and add,exchange。
- int fetch_and_add(std::atomic<int> &i) {
- int old_value(0);
- int new_value(0);
-
- do {
- old_value = i;
- new_value = old_value + 1;
- }
- while (!i.compare_exchange_strong(old_value, new_value));
-
- return new_value;
- }
-
- int exchange(std::atomic<int>& i, int j) {
- int old_value(0);
- int new_value(0);
-
- do {
- old_value = i;
- new_value = j;
- }
- while (!i.compare_exchange_strong(old_value, new_value));
-
- return new_value;
- }
CAS 如何保证数据的正确性?注意上面的 old_value,它保存着 i 的副本,在 i 执行 CAS 时,如果 old_value 中的值与 i 相同,即代表 i 没有被其他线程修改,保持原值,随后 CAS 操作成功,i 替换为 new_value 。如果 old_value 与 i 不相等,即代表其他线程修改了 i,随后 CAS 操作失败,重新刷新 old_value,new_value 并继续循环。
甚至可以实现反向比较并交换,即如果两个数值不相等就交换它们。
- bool unequal_and_swap(std::atomic<int>& i, int compare, int swap) {
- int old_value(0);
- do {
- old_value = i;
- if (old_value == compare)
- {
- return false;
- }
- }
- while (!i.compare_exchange_strong(old_value, swap));
-
- return true;
- }
这两个函数的区别在于,weak 在有的平台上(注意,是有的平台,这里不包括 x86)会存在失败的可能性。即,当*ptr == *expected依然有可能什么都不做而返回 false。
所以,在 x86 平台来说,这两者可以说是没什么区别。只是如果想要代码可移值性好,那么采用compare_exchange_weak并且使用循环来判断,那么是一种比较好的办法。
weak 的运行比 strong 快。若本来就需要在循环里执行 compare-and-swap,用 weak 的效率高;反之,只执行一次的话,用 strong 可以省去不必要的循环。拿不准的时候,用 strong 会好点。
结论就是:
compare_exchange_weak和循环来处理。compare_exchange_strong。compare_exchange_weak。基于链表的非阻塞栈
- #include
- #include
- #include
-
- template<typename T>
- struct node {
- T data;
- node* next;
- node(const T& data) : data(data), next(nullptr) {}
- };
-
- template<typename T>
- class stack {
- private:
- std::atomic
*> head; - public:
- void push(const T& data) {
- node
* new_node = new node(data); -
- // 放 head 的当前值到 new_node->next 中
- new_node->next = head.load(std::memory_order_relaxed);
-
- // 现在令 new_node 为新的 head ,但若 head 不再是
- // 存储于 new_node->next 的值(某些其他线程必须在刚才插入结点)
- // 则放新的 head 到 new_node->next 中并再尝试
- while(!head.compare_exchange_weak(new_node->next, new_node,
- std::memory_order_release,
- std::memory_order_relaxed));
-
- // 注意:上述使用至少在这些版本不是线程安全的
- // 先于 4.8.3 的 GCC (漏洞 60272 ),先于 2014-05-05 的 clang (漏洞 18899 )
- // 先于 2014-03-17 的 MSVC (漏洞 819819 )。下面是变通方法:
- // node
* old_head = head.load(std::memory_order_relaxed); - // do {
- // new_node->next = old_head;
- // } while(!head.compare_exchange_weak(old_head, new_node,
- // std::memory_order_release,
- // std::memory_order_relaxed));
- }
-
- T pop() {
- while (true) {
- node
* result = head.load(std::memory_order_relaxed); - node
* top = head.load(std::memory_order_relaxed); - if (result == nullptr) {
- throw std::string("Cannot pop from empty stack");
- }
- if (top && head.compare_exchange_strong(result, result->next)) { // CAS
- T temp = result->data;
- delete top;
- return temp;
- } else {
- std::cout << "cas false!" << std::endl;
- }
- }
- }
-
- ~stack() {
- node
* index = nullptr; - node
* top = head.load(std::memory_order_relaxed); - while (top != nullptr) {
- index = top->next;
- delete top;
- std::cout << "clear data!" << std::endl;
- top = index;
- }
- }
- };
-
- int main() {
- stack<int> s;
- s.push(1);
- s.push(2);
- s.push(3);
- std::cout << "Pop: " << s.pop() << std::endl;
- std::cout << "Pop: " << s.pop() << std::endl;
- }
压栈时:
单线程时,创建了一个新节点,它的 next 指针指向堆栈的顶部。接下来,调用 CAS 内置函数,把新的节点复制到 top 位置。
多线程时,可能有两个或更多线程同时尝试把数据压入堆栈。
(1) 假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30,而线程 A 先获得了时间片。 (2) 但是,在 n->next = top 指令结束之后,调度程序暂停了线程 A。 (3) 现在,线程 B 获得了时间片,它能够完成 CAS,把 30 压入堆栈后结束。 (4) 接下来,线程 A 恢复执行,显然对于这个线程 *top 和 n->next 不匹配,因为线程 B 修改了 top 位置的内容。 (5) 因此,代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的),调用 CAS,把 20 压入堆栈后结束。 (6) 整个过程没有使用任何锁。
弹栈时,即使线程 B 在线程 A 试图弹出数据的同时修改了堆栈顶,也可以确保不会跳过堆栈中的元素。
内存序,使用默认的std::memory_order_seq_cst即可。这是因为,在 x86 平台上,微调这两个参数生成的代码都是一样的,没有什么收益。其他弱内存序的 cpu 上面,可能每种 cpu 得到的最合适的这两个参数都会有所不同。为了避免去适配各个 cpu,那么一种偷懒的办法就是直接使用std::memory_order_seq_cst。一句话就是,直接使用var.compare_exchange_weak(a,b); 这种形式最省事。
ABA 问题描述: (1) 进程 P1 在共享变量中读到值为 A (2) P1 被抢占了,进程 P2 执行 (3) P2 把共享变量里的值从 A 改成了 B,再改回到 A,此时被 P1 抢占。 (4) P1 回来看到共享变量里的值没有被改变,于是继续执行。
例 1:
比如上述的 DeQueue() 函数,因为我们要让 head 和 tail 分开,所以我们引入了一个 dummy 指针给 head,当我们做 CAS 的之前,如果 head 的那块内存被回收并被重用了,而重用的内存又被 EnQueue() 进来了,这会有很大的问题。内存管理中重用内存基本上是一种很常见的行为,线程的切换有保存和恢复上下⽂的过程,即使切换前后对象地址没有变化,也不能确保它是没有变化的。
例 2:
(1) 假设一个提款机的例子。假设有一个遵循 CAS 原理的提款机,小灰有 100 元存款,要用这个提款机来提款 50 元。 (2) 由于提款机硬件出了点问题,小灰的提款操作被同时提交了两次,开启了两个线程,两个线程都是获取当前值 100 元,要更新成 50 元。 (3) 理想情况下,应该一个线程更新成功,一个线程更新失败,小灰的存款值被扣一次。 (4) 线程1 首先执行成功,把余额从 100 改成 50。线程2 因为某种原因阻塞。这时,小灰的妈妈刚好给小灰汇款 50 元。 (5) 线程2 仍然是阻塞状态,线程3 执行成功,把余额从 50 改成了 100。 (6) 线程2 恢复运行,由于阻塞之前获得了“当前值” 100,并且经过 compare 检测,此时存款实际值也是 100,所以会成功把变量值 100 更新成 50。 (7) 原本线程2 应当提交失败,小灰的正确余额应该保持 100 元,结果由于 ABA 问题提交成功了。
真正要做到严谨的 CAS 机制,在 compare 阶段不仅要比较期望值 A 和地址 V 中的实际值,还要比较变量的版本号是否一致。
举个例子:
(1) 假设地址 V 中存储着变量值 A,当前版本号是 01。线程1 获取了当前值 A 和版本号 01,想要更新为 B,但是被阻塞了。 (2) 这时候,内存地址 V 中变量发生了多次改变,版本号提升为 03,但是变量值仍然是 A。 (3) 随后线程1 恢复运行,进行 compare 操作。经过比较,线程1 所获得的值和地址的实际值都是 A,但是版本号不相等,所以这一次更新失败。
解决思路:增加版本号,每次变量更新时把版本号 +1,A-B-A 就变成了 1A-2B-3A。只有值和版本号全部相等,才会以原子方式将该引用、该标志的值设置为更新值。
存在的问题:自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。
参考:cppreference 和知乎