• CAS详解和学透面试必问并发安全问题


    CAS&Atomic 原子操作详解

            什么是原子操作?如何实现原子操作?

            什么是原子性?相信很多同学在工作中经常使用事务,事务的一大特性就是原子性(事务具有 ACID 四大特性),一个事务包含多个操作,这些操作要么全部执行,要么全都不执行。

            并发里的原子性和原子操作是一样的内涵和概念,假定有两个操作 A B 都包含多个步骤,如果从执行 A 的线程来看,当另一个线程执行 B 时,要么将 B 全部执行完,要么完全不执行 B,执行 B 的线程看 A 的操作也是一样的,那么 A 和 B 对彼此来说是原子的。

            实现原子操作可以使用锁,锁机制,满足基本的需求是没有问题的了,但是有的时候我们的需求并非这么简单,我们需要更有效,更加灵活的机制,synchronized 关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁。

            这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次, 如果获得锁的线程一直不释放锁怎么办?同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重。为了解决这个问题,Java 提供了 Atomic 系列的原子操作类。

            这些原子操作类其实是使用当前的处理器基本都支持 CAS 的指令,比如 Intel 的汇编指令 cmpxchg,每个厂家所实现的具体算法并不一样,但是原理基本一样。每一个 CAS 操作过程都包含三个运算符:一个内存地址 V,一个期望的值 A 和一 个新值 B,操作的时候如果这个地址上存放的值等于这个期望的值 A,则将地址上的值赋为新值 B,否则不做任何操作。

            CAS 的基本思路就是,如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。自然 CAS 操作执行完成时,在业务上不一定完成了,这个时候我们就会对 CAS 操作进行反复重试,于是就有了循环 CAS。很明显,循环 CAS 就是在一个循环里不断的做 cas 操作,直到成功为止。Java 中的 Atomic 系列的原子操作类的实现则是利用了循环 CAS 来实现。

            CAS 实现原子操作的三大问题

            ABA 问题

            因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。

            ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 ABA 就会变成 1A2B3A。举个通俗点的例子,你倒了一杯水放桌子上,干了点别的事,然后同事把你水喝了又给你重新倒了一杯水,你回来看水还在,拿起来就喝,如果你不管水中间被人喝过,只关心水还在,这就是 ABA 问题。

            如果你是一个讲卫生讲文明的小伙子,不但关心水在不在,还要在你离开的时候水被人动过没有,因为你是程序员,所以就想起了放了张纸在旁边,写上初始值 0,别人喝水前麻烦先做个累加才能喝水。

            循环时间长开销大。

            自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。

            只能保证一个共享变量的原子操作。

            当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操 作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。
            
            还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比 如,有两个共享变量 i 2 j=a ,合并一下 ij=2a ,然后用 CAS 来操作 ij 。从 Java 1.5开始,JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行 CAS 操作。

            Jdk 中相关原子操作类的使用

            AtomicInteger

            •int addAndGetint delta:以原子方式将输入的数值与实例中的值(AtomicInteger 里的 value)相加,并返回结果。

            •boolean compareAndSet int expectint update :如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。
            •int getAndIncrement() :以原子方式将当前值加 1 ,注意,这里返回的是自增前的值。
            •int getAndSet int newValue :以原子方式设置为 newValue 的值,并返回旧值。

            AtomicIntegerArray

            主要是提供原子的方式更新数组里的整型,其常用方法如下。

            •int addAndGet int iint delta :以原子方式将输入值与数组中索引 i 的元素相加。
            •boolean compareAndSet int iint expectint update :如果当前值等于预期值,则以原子方式将数组位置 i 的元素设置成 update 值。需要注意的是,数组 value 通过构造方法传递进去,然后 AtomicIntegerArray会将当前数组复制一份,所以当 AtomicIntegerArray 对内部的数组元素进行修改时,不会影响传入的数组。

            更新引用类型

            原子更新基本类型的 AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic 包提供了以下 3 个类。

            AtomicReference

            原子更新引用类型。

            AtomicStampedReference

            利用版本戳的形式记录了每次改变以后的版本号,这样的话就不会存在 ABA 问题了。这就是 AtomicStampedReference 的解决方案。AtomicMarkableReference 跟 AtomicStampedReference 差不多,AtomicStampedReference 是使用 pair int stamp 作为计数器使用,AtomicMarkableReference pair 使用的是 boolean mark。 还是那个水的例子,AtomicStampedReference 可能关心的是动过几次, AtomicMarkableReference 关心的是有没有被人动过,方法都比较简单。

            AtomicMarkableReference

            原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。构造方法是 AtomicMarkableReferenceV initialRefbooleaninitialMark)。

            LongAdder

            LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。

    面试必问并发安全问题

            线程安全性

            什么是线程安全性?我们可以这么理解,我们所写的代码在并发情况下使用时,总是能表现出正确的行为;反之,未实现线程安全的代码,表现的行为是不可预知的,有可能正确,而绝大多数的情况下是错误的。

            当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在调用代码中不需要任何额外的同步或者协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

            如何实现 

            1.线程封闭

            实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。什么是线程封闭呢?

            就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。

            栈封闭

            栈封闭是我们编程当中遇到的最多的线程封闭。什么是栈封闭呢?简单的说就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一份到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。所以能用局部变量就别用全局的变量,全局变量容易引起并发问题。

            TheadLocal

            ThreadLocal 是实现线程封闭的最好方法。ThreadLocal 内部维护了一个 Map,Map 的 key 是每个线程的名称,而 Map 的值就是我们要封闭的对象。每个线程中的对象都对应着 Map 中一个值,也就是 ThreadLocal 利用 Map 实现了对象的线程封闭。

            2.无状态的类

            没有任何成员变量的类,就叫无状态的类,这种类一定是线程安全的。

            3.让类不可变

            让状态不可变,加 final 关键字,对于一个类,所有的成员变量应该是私有的,同样的只要有可能,所有的成员变量应该加上 final 关键字,但是加上 final,要注意如果成员变量又是一个对象时,这个对象所对应的类也要是不可变,才能保证整个类是不可变的。

            但是要注意,一旦类的成员变量中有对象,上述的 final 关键字保证不可变并不能保证类的安全性,为何?因为在多线程下,虽然对象的引用不可变,但是对象在堆上的实例是有可能被多个线程同时修改的,没有正确处理的情况下,对象实例在堆中的数据是不可预知的。

            4.加锁和 CAS

            我们最常使用的保证线程安全的手段,使用 synchronized 关键字,使用显式锁,使用各种原子变量,修改数据时使用 CAS 机制等等。

            死锁  

            是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。

            所以总结一下:

            1、死锁是必然发生在多操作者(M>=2 个)争夺多个资源(N>=2 个,且 N<=M)
    才会发生这种情况。很明显,单线程自然不会有死锁,只有 B 一个去,不要 2 个,打十个都没问题;单资源呢?只有 13,A 和 B 也只会产生激烈竞争,打得不可开交,谁抢到就是谁的,但不会产生死锁。
            2、争夺资源的顺序不对,如果争夺资源的顺序是一样的,也不会产生死锁;
            3、争夺者对拿到的资源不放手。
    避免死锁常见的算法有有序资源分配法、银行家算法。
    1. package cn.tulingxueyuan.safe.dl;
    2. /**
    3. *@author
    4. *
    5. *类说明:演示死锁的产生
    6. */
    7. public class NormalDeadLock {
    8. private static Object No13 = new Object();//第一个锁
    9. private static Object No14 = new Object();//第二个锁
    10. //第一个拿锁的方法
    11. private static void zhouYuDo() throws InterruptedException {
    12. String threadName = Thread.currentThread().getName();
    13. synchronized (No13){
    14. System.out.println(threadName + " get No13");
    15. Thread.sleep(100);
    16. synchronized (No14){
    17. System.out.println(threadName + " get No14");
    18. }
    19. }
    20. }
    21. //第二个拿锁的方法
    22. private static void monkeyDo() throws InterruptedException {
    23. String threadName = Thread.currentThread().getName();
    24. synchronized (No13){
    25. System.out.println(threadName + " get No13");
    26. Thread.sleep(100);
    27. synchronized (No14){
    28. System.out.println(threadName + " get No14");
    29. }
    30. }
    31. }
    32. //子线程,代表周瑜老师
    33. private static class ZhouYu extends Thread{
    34. private String name;
    35. public ZhouYu(String name) {
    36. this.name = name;
    37. }
    38. @Override
    39. public void run() {
    40. Thread.currentThread().setName(name);
    41. try {
    42. zhouYuDo();
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. }
    48. public static void main(String[] args) throws InterruptedException {
    49. //主线程,代表Monkey老师
    50. Thread.currentThread().setName("Monkey");
    51. ZhouYu zhouYu = new ZhouYu("ZhouYu");
    52. //System.out.println(ManagementFactory.getRuntimeMXBean().getName());
    53. zhouYu.start();
    54. monkeyDo();
    55. }
    56. }

             如何解决

            定位

            要解决死锁,当然要先找到死锁,怎么找?

            通过 jps 查询应用的 id,再通过 jstack id 查看应用的锁的持有情况

            两种解决方式
                    1、内部通过顺序比较,确定拿锁的顺序;
                    2、采用尝试拿锁的机制。
            以下代码采用 显式锁实现
    1. package cn.tulingxueyuan.safe.dl;
    2. import java.util.Random;
    3. import java.util.concurrent.locks.Lock;
    4. import java.util.concurrent.locks.ReentrantLock;
    5. /**
    6. *@author Mark老师
    7. *类说明:演示普通账户的死锁和解决
    8. */
    9. public class TryLock {
    10. private static Lock No13 = new ReentrantLock();//第一个锁
    11. private static Lock No14 = new ReentrantLock();//第二个锁
    12. //先尝试拿No13 锁,再尝试拿No14锁,No14锁没拿到,连同No13 锁一起释放掉
    13. private static void zhouYuDo() throws InterruptedException {
    14. String threadName = Thread.currentThread().getName();
    15. Random r = new Random();
    16. while(true){
    17. if(No13.tryLock()){
    18. System.out.println(threadName +" get 13");
    19. try{
    20. if(No14.tryLock()){
    21. try{
    22. System.out.println(threadName +" get 14");
    23. System.out.println("zhouYuDo do work------------");
    24. break;
    25. }finally{
    26. No14.unlock();
    27. }
    28. }
    29. }finally {
    30. No13.unlock();
    31. }
    32. }
    33. //Thread.sleep(r.nextInt(3));
    34. }
    35. }
    36. //先尝试拿No14锁,再尝试拿No13锁,No13锁没拿到,连同No14锁一起释放掉
    37. private static void monkeyDo() throws InterruptedException {
    38. String threadName = Thread.currentThread().getName();
    39. Random r = new Random();
    40. while(true){
    41. if(No14.tryLock()){
    42. System.out.println(threadName +" get 14");
    43. try{
    44. if(No13.tryLock()){
    45. try{
    46. System.out.println(threadName +" get 13");
    47. System.out.println("monkeyDo do work------------");
    48. break;
    49. }finally{
    50. No13.unlock();
    51. }
    52. }
    53. }finally {
    54. No14.unlock();
    55. }
    56. }
    57. //Thread.sleep(r.nextInt(3));
    58. }
    59. }
    60. private static class ZhouYu extends Thread{
    61. private String name;
    62. public ZhouYu(String name) {
    63. this.name = name;
    64. }
    65. public void run(){
    66. Thread.currentThread().setName(name);
    67. try {
    68. zhouYuDo();
    69. } catch (InterruptedException e) {
    70. e.printStackTrace();
    71. }
    72. }
    73. }
    74. public static void main(String[] args) {
    75. Thread.currentThread().setName("Monkey");
    76. ZhouYu zhouYu = new ZhouYu("ZhouYu");
    77. zhouYu.start();
    78. try {
    79. monkeyDo();
    80. } catch (InterruptedException e) {
    81. e.printStackTrace();
    82. }
    83. }
    84. }

            其他安全问题 

            活锁

            两个线程在尝试拿锁的机制中,发生多个线程之间互相谦让,不断发生同一个线程总是拿到同一把锁,在尝试拿另一把锁时因为拿不到,而将本来已经持有的锁释放的过程。

            解决办法:每个线程休眠随机数,错开拿锁的时间。

            线程饥饿

            低优先级的线程,总是拿不到执行时间

  • 相关阅读:
    Windows下自动云备份思源笔记到Gitee
    Vue(二)——基本操作
    ES|使用Postman更新ES内所有文档的指定字段
    Postgresql源码(88)column definition list语义解析流程分析
    力扣(LeetCode)1732. 找到最高海拔(C++)
    IOS OpenGL ES GPUImage 图像镜像 GPUImageTransformFilter
    电力系统IEEE14节点系统同步模型(Simulink)
    栈和队列知识点+例题
    寻找更好的分类模型loss
    Linux C++ 使用动态链接库
  • 原文地址:https://blog.csdn.net/qq_67801847/article/details/133711413