• CAS 机制的实现原理分析


            在 synchronized 中很多地方都用到了CAS机制,它的叫法有很多,比如CompareAndSwap、CompareAndExchange、CompareAndSet,它是一个能够进行比较和替换的方法,这个方法能够在多线程环境下保证对一个共享变量进行修改时的原子性不变。
            为了更好地理解CAS 机制,我们来看下面这个例子,下面这个例子演示了一个对成员变量i进行累加的过程。

    1. public class CasExample {
    2. public volatile int i;
    3. public synchronized void incr() {
    4. i++;
    5. }
    6. }

            在不增加synchronized同步锁的情况下,incr()方法一定不是线程安全的,也就是说它无法保证原子性,但是增加锁又会导致性能问题,有没有更好的方式呢?
            这个时候我们想到了一种乐观锁机制:在线程调用i++之前,先判断i的值和之前读取的i的预期值是否相等。如果相等,则说明i的值没有被其他线程修改过,这个时候可以正常修改;否则,表示修改过,就要重新读取最新的i的值进行累加。
            按照乐观锁的思想修改后,大概就变成了下面这种结构,每次调用incr()方法时,都传递一个之前读取的i的预期值expect,如果相等就进行i++操作。

    1. public class CasExample {
    2. public volatile int i;
    3. public void incr(int expect){
    4. if(i==expect) {
    5. i++;
    6. }
    7. }

            但是这里存在一个问题,if语句的判断和i++指令并不是原子的,也就是说当多个线程同时执行到i==expect 这个判断条件时,初始加载的expect都是0,这会导致多个线程同时满足条件,最终还是会导致原子性问题。
            CAS就是解决这个问题的方法,如图所示,该图表示通过CAS对变量V进行原子更新操作。CAS方法中会传递三个参数,第一个参数V表示要更新的变量,第二个参数E表示期望值,第三个参数U表示更新后的值。更新的方式是,如果V=E,表示预期值和实际值相等,则将修改成U并返回true,否则修改失败返回 false


            在Java 中的 Unsafe类中提供了 CAS方法,针对int类型变量的CAS方法定义如下。

    public final native boolean compareAndSwapInt(0bject o, long offset, int expect, int update);

            从方法定义中可以看到,它有四个参数:

    • o,表示当前的实例对象。
    • offset,表示实例变量的内存地址偏移量。
    • expect,表示预期值。
    • update,表示要更新的值。

            expect 和 update 比较好理解,offset表示目标变量X在实例对象0中内存地址的偏移量。简单来说,在预期值expect要和目标变量X进行比较是否相等的判断中,目标变量X的值就是通过该偏移量从内存中获得的。
     

    CAS 在 AtomicInterger 中的应用

    为了更好地理解CAS,我们以AtomicInteger为例来进行说明,AtomicInteger是一个能够保证原子性的Integer对象,也就是说,对于计+类的操作,可以使用AtomicInteger来保证原子性,使用方法如下。

    1. public AtomicInteger atomicInteger = new AtomicInteger(e);
    2. public void add(){
    3. atomicInteger.getAndIncrement();
    4. }

            getAndIncrement()是用来实现原子累加的方法,每调用一次会在原来值的基础上+1,这个过程采用了 CAS机制来保证原子性。
            下面来看一下getAndIncrement()方法的定义。

    1. public final int getAndIncrement() {
    2. return unsafe.getAndAddInt(this, value0ffset,1);
    3. }

            其中,valueOffset表示AtomicInteger中的成员变量value在内存中的偏移量,后续会用它直接从内存中读取value属性当前的值,valueOffset的初始化方法如下。

    1. private static final Unsafe unsafe = Unsafe.getUnsafe();
    2. private static final long valueOffset;
    3. static {
    4. try{
    5. valueOffset = unsafe.objectField0ffset
    6. (AtomicInteger.class.getDeclaredField("value"));
    7. } catch (Exception ex) {
    8. throw new Error(ex);
    9. }
    10. }
    11. private volatile int value;


    valueOffset 用到了unsafe.objectFieldOffset()方法,获取 value 字段在AtomicInteger.class 中的偏移量。
    结合这段代码的分析,对前面提到的o和 offset这两个字段的含义就不难理解了。在CAS 中,我们需要通过expect去和某个字段的值进行比较,而expect比较的目标值就是通过 offset找到某个字段在内存中的实际值(在AtomicInteger中是指value字段),如果相等,就修改成update 并返回true,否则返回false
    下面来看一下unsafe.getAndAddInt的定义代码。

    1. public final int getAndAddInt(Object o, long offset, int n) {
    2. int v;
    3. do{
    4. v = this.getIntVolatile(o, offset);
    5. } while(!this.compareAndSwapInt(o,offset,v,v+n));
    6. return var5;
    7. }

    代码实现逻辑分析如下:

    • “v = this.getlntVolatile(o, offset);” 表示根据value在对象o的偏移量来获得当前的值v。
    • 使用compareAndSwapInt()方法实现比较和替换,如果value当前的值和v相等,说明数据没有被其他线程修改过,则把value修改成 v+n。
    • 这里采用了循环来实现,原因想必大家能猜测到。如果compareAndSwapInt()方法执行失败,则说明存在线程竞争,但是当前的方法是进行原子累加,所以必须要保证成功,为了达到这个目的,就只能不断地循环重试,直到修改成功后返回。


            整体来说,CAS 就是一种基于乐观锁机制来保证多线程环境下共享变量修改的原子性的解决方案。前面分析的案例虽然是在Java中的应用场景,但是它本质上和synchronized同步锁中用到的 CAS 是相同的,我们来看一下Unsafe类中CAS 的定义。

    public final native boolean compareAndswapInt(0bject o, long offset, int expect, int update);
    

            compareAndSwapInt()是一个native方法,该方法是在JVM中定义和实现的。

    CAS 实现自旋锁

            在本blog中很多地方都会提到自旋锁,那么什么是自旋锁呢?
            我们知道,在synchronized同步锁中,没有竞争到锁的线程必须要等待,直到获得锁资源的线程释放锁,才会唤醒处于锁等待的线程,而这个过程会涉及从用户态到内核态的切换带来的性能开销。在存在竞争的情况下,我们能否通过固定次数的重试,在线程进入锁等待状态之前占用锁资源呢?基于这个原因就产生了自旋锁。
            所谓自旋锁就是当一个线程在抢占锁资源时,如果锁已经被其他线程获取,那么该线程将会循环不断地判断及尝试抢占锁资源,在这个过程中该线程一直保持运行状态,不会造成上下文切换带来的性能损耗。但是自旋锁也有缺点,如果获得锁资源的线程一直没有释放,那么当前线程就会一直重试从而造成CPU资源的浪费。因此,在synchronized中会用到固定次数的自旋和自适应自旋。
            实现自旋锁的方式比较简单,需要满足如下两个条件。

    • 通过for(;;)循环不断循环重试。
    • 通过一个线程安全的操作去尝试抢占资源,而CAS 就是很好的方法,CAS是一个满足原子操作的方法,它的返回值true/false可以很好地判断当前线程竞争的结果。

            AtomicInteger中的getAndAddInt()方法其实就是一种自旋,通过一个do...while循环不断对 value 进行累加,直到累加成功便返回。

    1. public final int getAndAddInt(0bject o, long offset, int n){
    2. int v;
    3. do {
    4. v = this,getIntVolatile(o, offset);
    5. } while(!this.compareAndSwapInt(o, offset,v,v+n));
    6. return var5;
    7. }

            在JVM的 synchronized的重量级锁实现中,它的白旋实现采用的是for(;;)循环,然后在该循环中通过 Atomic::cmpxchg_ptr进行CAS来抢占锁资源。

    1. int ObjectMonitor::TryLock (Thread * Self) {
    2. for(;;) {
    3. void * own = _owner;
    4. if (own != NULL) return 0;
    5. if (Atomic::cmpxchg_ptr (Self,&_owner, NULL) == NULL) {
    6. assert (_recursions ==0, "invariant");
    7. assert (_owner == Self, "invariant");
    8. return 1 ;
    9. }
    10. if (true) return -1;
    11. }
    12. }

    CAS 在 JVM 中的实现原理分析

    读者应该对CAS如何解决原子性的问题还存在比较多的疑惑。

            举个例子,如果多个线程调用CAS,并且多个线程都去执行预期值与实际值的判断,那么应该还存在原子性问题才对。除非当线程在执行offset偏移量的值和expect进行比较时加锁,保证在同一时刻只允许一个线程来判断。
            带着这个疑惑,我们从源码层面做一个分析,由于源码是JVM层面的C++代码实现,所以笔者会对核心逻辑做一个说明,以帮助读者理解。
    基于comparcAndSwapInt(方法,在JVM源码中的unsafe.cpp文件中找到该方法的定义如下。

    1. //UNSAFE_ENTRY 表示一个宏定义
    2. //obj/offset/e/x 分别对应Java中定义的compareAndSwapInt()方法的入参,这里不做复述
    3. UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
    4. UnsafeWrapper("Unsafe_CompareAndSwapInt");
    5. oop p=JNIHandles::resolve(obj);
    6. jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
    7. return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    8. UNSAFE_END


    代码解读如下。

    • “oopp = JNIHandles::resolve(obj);”,这个方法是把Java对象引用转化为JVM中的对象实例。
    • “ jint* addr = (jint *) index_oop_from_ field offset_long(p, offset);”,根据偏移量 offiset 计算 value 的地址。
    • “(Atomic::cmpxchg(x, addr,e))”,比较addr和c是否相等,如果相等就把x赋值到目标字段,该方法会返回修改之前的目标字段的值。

            Atomic::cmpxchg()方法的定义在atomic.epp文件中,代码如下。

    1. unsigned Atomic::cmpxchg(unsigned int exchange_value,
    2. volatile unsigned int* dest, unsigned int compare_value) {
    3. assert(sizeof(unsigned int) == sizeof(jint), "more work to do" );
    4. return (unsigned int)Atomic::cmpxchg((jint)exchange_value,(volatile jint*)dest,
    5. (jint)compare_value);
    6. }


            该方法并没有定义具体的实现。其实,对于CAS操作,不同的操作系统和CPU架构,其保证原子性的方法可能会不一样,而JVM本身是跨平台的语言,它需要在任何平台和CPU架构下都保证一致性。因此,Atomic::cmpxchgO方法会根据不同的操作系统类型和CPU架构,在预编译阶段确定调用哪个平台下的重载,图展示的是JVM源码中定义的多个平台的重载。


            以 Linux系统为例,当定位到atomic_linux_x86.inline.hpp文件时,Atomic:.cmpxchg的具体实现方法如下。

    1. inline jint    Atomic::cmpxchg    (jint    exchange_value,volatile jint*    dest,    
    2. jint compare_value) {
    3. int mp = os::is_MP();
    4. __asm__volatile (LOCK_IF_MP(%4) "cmpxchg1 %1, (%3)"
    5. :"=a"(exchange_value)
    6. : "r" (exchange_value),"a" (compare_value),"r" (dest),"r"(mp)
    7. :"cc","memory");
    8. return exchange_value;
    9. }


            代码说明如下。

    • mp(multi-processor),os::is_MPO用于判断是否是多核CPU。
    • __asm_表示内嵌汇编代码。
    • volatile用于通知编译器对访问该变量的代码不再进行优化。
    • LOCK_IF_MP(%4)表示如果CPU是多核的,则需要为compxchgl指令增加一条Lock指令。
    • 具体的执行过程是,先判断寄存器中的compare_value变量值是否和dest地址所存储的值相等,如果相等,就把exchange_value的值写入dest 指向的地址。

            总的来说,上面代码的功能是基于汇编指令cmpxchg1从主内存中执行比较及替换的操作来实现数据的变更。但是,在多核心CPU的情况下,这种方式仍然不是原子的,所以为了保证多核 CPU下执行该指令时的原子性,会增加一个Lock指令。Lock翻译成中文就是锁的意思,按照前面的猜想,CAS底层必然用到了锁的机制,否则无法实现原子性,因此这个猜想被证实是对的。
            Lock 的作用有两个:

    • 保证指令执行的原子性。
    • 禁止该指令与其前后的读和写指令重排序。
  • 相关阅读:
    【URI和URL】的区别比较与理解
    OrangePIPC2---镜像制作
    Java 解决long类型数据在前后端传递失真问题
    为什么互联网大厂一边疯狂裁员,一边不停招聘?
    设计原则之【单一职责原则】
    初探flask debug生成pin码
    C语言变量的定义和赋值
    《PyTorch深度学习实践》第十二课(循环神经网络RNN)
    Dockerfile指令详解
    centos使用lftp备份文件
  • 原文地址:https://blog.csdn.net/wm31898/article/details/124436326