• golang---锁


    Mutex

    结构体

    type Mutex struct {
    	state int32 //表示互斥锁的状态,比如是否被锁定等。
    	sema  uint32//表示互斥锁的状态,比如是否被锁定等。
    }
    
    • 1
    • 2
    • 3
    • 4
    const (
       mutexLocked = 1 << iota // 表示互斥锁的锁定状态
       mutexWoken // 表示从正常模式被从唤醒
       mutexStarving // 当前的互斥锁进入饥饿状态
       mutexWaiterShift = iota // 当前互斥锁上等待者的数量
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Mutex.state是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态。
    在这里插入图片描述

    • Locked: 表示该Mutex是否已被锁定,0:没有锁定 1:已被锁定。
    • Woken: 表示是否有协程已被唤醒,0:没有协程唤醒1:已有协程唤醒,正在加锁过程中。
    • Starving:表示该Mutex是否处于饥饿状态,0:没有饥饿1:饥饿状态,说明有协程阻塞了超过1ms。
    • Waiter: 表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。

    协程之间抢锁实际上是抢给Locked位赋值的权利,能给Locked位置1,就说明抢锁成功。抢不到的话就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。

    Mutex方法

    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
    	Lock() //加锁方法
    	Unlock()//解锁方法
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    简单加锁

    在这里插入图片描述
    加锁过程会去判断Locked标志位是否为0,如果是0则把Locked位置1,代表加锁成功。从上图可见,加锁成功后,只是Locked位置1,其他状态位没发生变化。

    sync_runtime_canSpin(i int)

    自旋条件如下:

    • 自旋的次数要在4次以内
    • CPU必须为多核
    • GOMAXPROCS>1
    • 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
    const active_spin     = 4
    func sync_runtime_canSpin(i int) bool {
     if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
      return false
     }
     if p := getg().m.p.ptr(); !runqempty(p) {
      return false
     }
     return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    runtime_doSpin

    循环次数被设置为30次,自旋操作就是执行30次PAUSE指令,通过该指令占用CPU并消费CPU时间,进行忙等待;

    这就是整个自旋操作的逻辑,这个就是为了优化 等待阻塞->唤醒->参与抢占锁这个过程不高效,所以使用自旋进行优化,在期望在这个过程中锁被释放。

    const active_spin_cnt = 30
    func sync_runtime_doSpin() {
     procyield(active_spin_cnt)
    }
    // asm_amd64.s
    TEXT runtime·procyield(SB),NOSPLIT,$0-0
     MOVL cycles+0(FP), AX
    again:
     PAUSE
     SUBL $1, AX
     JNZ again
     RET
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    lockSlow

    func (m *Mutex) lockSlow() {
    	var waitStartTime int64 //来计算waiter的等待时间
    	starving := false //是饥饿模式标志,如果等待时长超过1ms,starving置为true,后续操作会把Mutex也标记为饥饿状态。
    	awoke := false//表示协程是否唤醒
    	iter := 0//用于记录协程的自旋次数
    	old := m.state//记录当前锁的状态
    	for {
    		//判断是否允许进入自旋 两个条件:
    		//1.old&(mutexLocked|mutexStarving) == mutexLocked ==》当前锁不能处于饥饿状态(用来判断锁是否处于正常模式且加锁)
    		//mutexLocked 二进制表示为 0001
    		//mutexStarving 二进制表示为 0100
    		//mutexLocked|mutexStarving 二进制为 0101. 使用0101在当前状态做 &操作,如果当前处于饥饿模式,低三位一定会是1,如果当前处于加锁模式,低1位一定会是1,所以使用该方法就可以判断出当前锁是否处于正常模式且加锁;
    		//2.runtime_canSpin(iter)==》其逻辑是在多核CPU运行,自旋的次数小于4
    		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    // !awoke 判断当前goroutine不是在唤醒状态
    // old&mutexWoken == 0 表示没有其他正在唤醒的goroutine
    // old>>mutexWaiterShift != 0 表示等待队列中有正在等待的goroutine
    // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将当前锁的低2位的Woken状态位设置为1,表示已被唤醒, 这是为了通知在解锁Unlock()中不要再唤醒其他的waiter了
    			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    				awoke = true
    			}
    			//判断当前goroutine可以进自旋后,调用runtime_doSpin方法进行自旋:
    			runtime_doSpin()
    			iter++
    			old = m.state
    			continue
    		}
    //--------------------抢锁准备期望状态------------------------
    //自旋逻辑处理好后开始根据上下文计算当前互斥锁最新的状态,根据不同的条件来计算mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
    //首先计算mutexLocked的值:
    // 基于old状态声明到一个新状态
    		new := old
    // 新状态处于非饥饿的条件下才可以加锁
    		if old&mutexStarving == 0 {
    			new |= mutexLocked
    		}
    //计算mutexWaiterShift的值:
    //如果old已经处于加锁或者饥饿状态,则等待者按照FIFO的顺序排队
    		if old&(mutexLocked|mutexStarving) != 0 {
    			new += 1 << mutexWaiterShift
    		}
    //计算mutexStarving的值:
     如果当前锁处于饥饿模式,并且已被加锁,则将低3位的Starving状态位设置为1,表示饥饿
    		if starving && old&mutexLocked != 0 {
    			new |= mutexStarving
    		}
    //计算mutexWoken的值:	
    // 当前goroutine的waiter被唤醒,则重置flag	
    		if awoke {
       // 唤醒状态不一致,直接抛出异常
    			if new&mutexWoken == 0 {
    				throw("sync: inconsistent mutex state")
    			}
    // 新状态清除唤醒标记,因为后面的goroutine只会阻塞或者抢锁成功
    // 如果是挂起状态,那就需要等待其他释放锁的goroutine来唤醒。
    // 假如其他goroutine在unlock的时候发现Woken的位置不是0,则就不会去唤醒,那该goroutine就无法在被唤醒后加锁
    			new &^= mutexWoken
    		}
    //-----------------------通过CAS操作更新期望状态------------------------------------------
    //这块的逻辑很复杂,通过CAS来判断是否获取到锁,没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex通过信号量保证资源不会被
    //两个 goroutine 获取,runtime.sync_runtime_SemacquireMutex会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 goroutine 可
    //以获取信号量,它就会立刻返回,如果是新来的goroutine,就需要放在队尾;如果是被唤醒的等待锁的goroutine,就放在队头,整个过程还需要啃代码来加深理解。
    //上面我们已经得到了锁的期望状态,接下来通过CAS将锁的状态进行更新:
    		if atomic.CompareAndSwapInt32(&m.state, old, new) {
    		  // 如果原来锁的状态是没有加锁的并且不处于饥饿状态,则表示当前goroutine已经获取到锁了,直接推出即可
    			if old&(mutexLocked|mutexStarving) == 0 {
    				break // locked the mutex with CAS
    			}
       // 到这里就表示goroutine还没有获取到锁,waitStartTime是goroutine开始等待的时间,waitStartTime != 0就表示当前goroutine已经等待过了,则需要将其放置在等待队列队头,否则就排到队列队尾
       			queueLifo := waitStartTime != 0
    			if waitStartTime == 0 {
    				waitStartTime = runtime_nanotime()
    			}
    			// 阻塞等待
    			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    	 // 被信号量唤醒后检查当前goroutine是否应该表示为饥饿
         // 1. 当前goroutine已经饥饿
         // 2. goroutine已经等待了1ms以上
    			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    			 // 再次获取当前锁的状态
    			old = m.state
    			 // 如果当前处于饥饿模式,
    			if old&mutexStarving != 0 {
    // 如果当前锁既不是被获取也不是被唤醒状态,或者等待队列为空 这代表锁状态产生了不一致的问题
    				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
    					throw("sync: inconsistent mutex state")
    				}
    				// 当前goroutine已经获取了锁,等待队列-1
    				delta := int32(mutexLocked - 1<<mutexWaiterShift)
    			// 当前goroutine非饥饿状态 或者 等待队列只剩下一个waiter,则退出饥饿模式(清除饥饿标识位)              
    				if !starving || old>>mutexWaiterShift == 1 {
    					delta -= mutexStarving
    				}
    				// 更新状态值并中止for循环,拿到锁退出
    				atomic.AddInt32(&m.state, delta)
    				break
    			}
    			// 设置当前goroutine为唤醒状态,且重置自璇次数
    			awoke = true
    			iter = 0
    		} else {
    		 // 锁被其他goroutine占用了,还原状态继续for循环
    			old = m.state
    		}
    	}
    
    	if race.Enabled {
    		race.Acquire(unsafe.Pointer(m))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    加锁被阻塞

    假定加锁时,锁已被其他协程占用了,此时加锁过程如下图所示:
    在这里插入图片描述
    从上图可看到,当协程B对一个已被占用的锁再次加锁时,Waiter计数器增加了1,此时协程B将被阻塞,直到Locked值变为0后才会被唤醒。

    简单解锁

    假定解锁时,没有其他协程阻塞,此时解锁过程如下图所示:
    在这里插入图片描述
    由于没有其他协程阻塞等待加锁,所以此时解锁时只需要把Locked位置为0即可,不需要释放信号量。

    解锁并唤醒协程

    假定解锁时,有1个或多个协程阻塞,此时解锁过程如下图所示:

    在这里插入图片描述

    Unlock

    func (m *Mutex) Unlock() {
    	if race.Enabled {
    		_ = m.state
    		race.Release(unsafe.Pointer(m))
    	}
    
    //使用AddInt32方法快速进行解锁,将m.state的低1位置为0,
    	new := atomic.AddInt32(&m.state, -mutexLocked)
    	//然后判断新的m.state值,如果值为0,则代表当前锁已经完全空闲了,结束解锁,不等于0说明当前锁没有被占用
    	if new != 0 {
    //会有等待的goroutine还未被唤醒,需要进行一系列唤醒操作,这部分逻辑就在unlockSlow方法内:
    		m.unlockSlow(new)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    TryLock 非阻塞加锁

    TryLock的实现就比较简单了,主要就是两个判断逻辑:

    • 判断当前锁的状态,如果锁处于加锁状态或饥饿状态直接获取锁失败
    • 尝试获取锁,获取失败直接获取锁失败
      TryLock并不被鼓励使用,至少我还没想到有什么场景可以使用到它。
    func (m *Mutex) TryLock() bool {
    // // 记录当前状态
    	old := m.state
    	  处于加锁状态/饥饿状态直接获取锁失败
    
    	if old&(mutexLocked|mutexStarving) != 0 {
    		return false
    	}
    	 // 尝试获取锁,获取失败直接获取失败
    	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
    		return false
    	}
    
    	if race.Enabled {
    		race.Acquire(unsafe.Pointer(m))
    	}
    	return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    unlockSlow

    我们在唤醒goroutine时正常模式/饥饿模式都调用func runtime_Semrelease(s *uint32, handoff bool, skipframes int),这两种模式在第二个参数的传参上不同,如果handoff is true, pass count directly to the first waiter.。

    func (m *Mutex) unlockSlow(new int32) {
    /// 这里表示解锁了一个没有上锁的锁,则直接发生panic
    	if (new+mutexLocked)&mutexLocked == 0 {
    		throw("sync: unlock of unlocked mutex")
    	}
    	 正常模式的释放锁逻辑
    	if new&mutexStarving == 0 {
    		old := new
    		for {
    	// 如果没有等待者则直接返回即可
          // 如果锁处于加锁的状态,表示已经有goroutine获取到了锁,可以返回
          // 如果锁处于唤醒状态,这表明有等待的goroutine被唤醒了,不用尝试获取其他goroutine了
          // 如果锁处于饥饿模式,锁之后会直接给等待队头goroutine
    			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    				return
    			}
    // 抢占唤醒标志位,这里是想要把锁的状态设置为被唤醒,然后waiter队列-1
    			new = (old - 1<<mutexWaiterShift) | mutexWoken
    			if atomic.CompareAndSwapInt32(&m.state, old, new) {
    			// 抢占成功唤醒一个goroutine
    				runtime_Semrelease(&m.sema, false, 1)
    				return
    			}
    			 // 执行抢占不成功时重新更新一下状态信息,下次for循环继续处理
    			old = m.state
    		}
    	} else {
    // 饥饿模式释放锁逻辑,直接唤醒等待队列goroutine
    		runtime_Semrelease(&m.sema, true, 1)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    协程A解锁过程分为两个步骤,一是把Locked位置0,二是查看到Waiter>0,所以释放一个信号量,唤醒一个阻塞的协程,被唤醒的协程B把Locked位置1,于是协程B获得锁。

    自旋

    什么是自旋

    自旋对应于CPU的”PAUSE”指令,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期。
    自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态。

    自旋条件

    加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。
    自旋必须满足以下所有条件:

    • 自旋次数要足够小,通常为4,即自旋最多4次
    • CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
    • 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋
    • 协程调度机制中的可运行队列必须为空,否则会延迟协程调度

    可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。

    自旋的优势

    自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。

    自旋的问题

    如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。
    为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。

    Starving位

    每个Mutex都有两个模式,称为Normal和Starving。

    Normal

    默认情况下,Mutex的模式为normal。

    该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。

    starvation模式

    自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
    但是这个时候的starvation模式,要在下一次相同情况时(有别的协程在自旋过程中刚好锁被释放),才会体现starvation模式的作用。即别的协程就算在自旋,也不能获得锁,只有去阻塞,因为当前处于Mutex饥饿状态。

    处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。

    为什么重复解锁要panic

    可能你会想,为什么Go不能实现得更健壮些,多次执行Unlock()也不要panic?

    仔细想想Unlock的逻辑就可以理解,这实际上很难做到。Unlock过程分为将Locked置为0,然后判断Waiter值,如果值>0,则释放信号量。

    如果多次Unlock(),那么可能每次都释放一个信号量,这样会唤醒多个协程,多个协程唤醒后会继续在Lock()的逻辑里抢锁,势必会增加Lock()实现的复杂度,也会引起不必要的协程切换。

    总结

    • 互斥锁有两种模式:正常模式、饥饿模式,饥饿模式的出现是为了优化正常模式下刚被唤起的goroutine与新创建的goroutine竞争时长时间获取不到锁,在Go1.9时引入饥饿模式,如果一个goroutine获取锁失败超过1ms,则会将Mutex切换为饥饿模式,如果一个goroutine获得了锁,并且他在等待队列队尾 或者 他等待小于1ms,则会将Mutex的模式切换回正常模式
    • 加锁的过程:
    1. 锁处于完全空闲状态,通过CAS直接加锁
    2. 当锁处于正常模式、加锁状态下,并且符合自旋条件,则会尝试最多4次的自旋
    3. 若当前goroutine不满足自旋条件时,计算当前goroutine的锁期望状态
    4. 尝试使用CAS更新锁状态,若更新锁状态成功判断当前goroutine是否可以获取到锁,获取到锁直接退出即可,若不同获取到锁子则陷入睡眠,等待被唤醒
    5. goroutine被唤醒后,如果锁处于饥饿模式,则直接拿到锁,否则重置自旋次数、标志唤醒位,重新走for循环自旋、获取锁逻辑;
    • 解锁的过程
    1. 原子操作mutexLocked,如果锁为完全空闲状态,直接解锁成功
    2. 如果锁不是完全空闲状态,,那么进入unlockedslow逻辑
    3. 如果解锁一个未上锁的锁直接panic,因为没加锁mutexLocked的值为0,解锁时进行mutexLocked - 1操作,这个操作会让整个互斥锁魂村,所以需要有这个判断
    4. 如果锁处于饥饿模式直接唤醒等待队列队头的waiter
    5. 如果锁处于正常模式下,没有等待的goroutine可以直接退出,如果锁已经处于锁定状态、唤醒状态、饥饿模式则可以直接退出,因为已经有被唤醒的 goroutine 获得了锁.
    • 使用互斥锁时切记拷贝Mutex,因为拷贝Mutex时会连带状态一起拷贝,因为Lock时只有锁在完全空闲时才会获取锁成功,拷贝时连带状态一起拷贝后,会造成死锁
    • TryLock的实现逻辑很简单,主要判断当前锁处于加锁状态、饥饿模式就会直接获取锁失败,尝试获取锁失败直接返回;

    参考资料

    使用defer避免死锁

    加锁和解锁应该成对出现

    RWMutex

    type RWMutex struct {
    	w           Mutex  // held if there are pending writers
    	writerSem   uint32 // semaphore for writers to wait for completing readers
    	readerSem   uint32 // semaphore for readers to wait for completing writers
    	readerCount int32  // number of pending readers
    	readerWait  int32  // number of departing readers
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    百度网盘的音乐怎么分享到qq音乐里?
    7天入门python系列之爬取热门小说项目实战,互联网的东西怎么算白嫖呢
    keras fit_generator 增加并行度问题 use_multiprocessing
    有限元学习笔记-传热及流体力学问题
    解密堆排序与TopK问题
    iic驱动oled屏幕显示温湿度基于FreeRTOS实现多任务
    浅析MVC、MVP、MVVM 框架实现
    Ubuntu22无法自动进入lightdm图像界面
    【服务器搭建】教程四:域名怎样进行备案?快来看~
    刷题 DFS : 受伤的皇后 (python, java)
  • 原文地址:https://blog.csdn.net/newbieJ/article/details/125554752