源码地址 weekly.2009-11-06 src/pkg/sync/mutex.go
初代版本锁比较简单,通过 cas 来 对key 进行加减来控制状态,原理就是一个 FIFO 先进先出的队列
这个版本有一个明显的问题,虽然 FIFO 没什么不好,但是有个唤醒动作,如果这时候有新进来的 goroutine,在这个版本中是要排队的,再去唤醒队列中的 goroutine,如果能让新来的直接运行,那么能够节省一个唤醒操作
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
key uint32;
sema uint32;
}
func xadd(val *uint32, delta int32) (new uint32) {
for {
v := *val;
nv := v+uint32(delta);
if cas(val, v, nv) {
return nv;
}
}
panic("unreached");
}
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 {
// changed from 0 to 1; we hold lock
return;
}
runtime.Semacquire(&m.sema);
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 {
// changed from 1 to 0; no contention
return;
}
runtime.Semrelease(&m.sema);
}
1101
等待的goroutine数量 = (2^3 + 2^2) / 4 = 3
所以 1101 代表 当前锁被持有,但是无醒着的goroutine 有三个等待着的 goroutine
这个版本和初代版本的相比,给新进来的goroutine一个机会去竞争,而不是让新人去排队,且如果在唤醒FIFO队列中的goroutine之前,就获取到锁,那么就能省一个唤醒开销
Lock:
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// 幸运的拿到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked
// 锁被持有 || 有 goroutine被唤醒 || 有 goroutine 在等待
if old&mutexLocked != 0 {
// 增加等待的数量,+4 因为 state 第一位代表锁是否被持有,第二位代表是否有goroutine被唤醒
new = old + 1<<mutexWaiterShift
}
if awoke {
// 清除唤醒标记
new &^= mutexWoken
}
// 变更state到新的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//新人拿锁,不用睡了
if old&mutexLocked == 0 {
break
}
//拿不到锁,等待唤醒
runtime.Semacquire(&m.sema)
awoke = true
}
}
}
Unlock:
func (m *Mutex) Unlock() {
// 先变更状态(因为lock是 CompareAndSwapInt32),再判断之前的状态是否是已经解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
old := new
for {
//如果没有等待的goroutine,
//已经有goroutine获取到锁
//已经有goroutine被唤醒
//上面三种不需要再去唤醒其它goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
//唤醒状态
new = (old - 1<<mutexWaiterShift) | mutexWoken
//标记唤醒状态,走唤醒流程
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
//新人比较猛,获取成功了,走到这里
old = m.state
}
}
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// 幸运的拿到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
// 锁被持有 || 有 goroutine被唤醒 || 有 goroutine 在等待
if old&mutexLocked != 0 {
//还能自旋
if runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//进入自旋流程
runtime_doSpin()
iter++
continue
}
// 增加等待的数量,+4 因为 state 第一位代表锁是否被持有,第二位代表是否有goroutine被唤醒
new = old + 1<<mutexWaiterShift
}
if awoke {
//当重复解锁是有可能触发
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// 清除唤醒标记
new &^= mutexWoken
}
// 变更state到新的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//新人拿锁,不用睡了
if old&mutexLocked == 0 {
break
}
//拿不到锁,等待唤醒
runtime.Semacquire(&m.sema)
awoke = true
//已经进入等待队列了,下次唤醒重置自旋次数
iter = 0
}
}
}
源码地址
新人是有机会了,但是有可能每次都让新人获取了,那么老人怎么办,所以要解决饥饿问题
总的来说,就是让超过一定时间的goroutine打上饥饿标识,来获取优先权去执行,来解决饥饿问
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota //锁占用标识
mutexWoken //唤醒标识
mutexStarving //第三位做为饥饿模式标识
mutexWaiterShift = iota //第三位被占用,后移一位作为goroutine等待数(2+1)
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
)
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// 幸运获取锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
//静态检查可以忽略
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64 //超过多长时间进入饥饿模式
starving := false //饥饿标识
awoke := false //唤醒标识
iter := 0 //自旋次数
old := m.state
for {
//非饥饿模式下的锁占用&& 可自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自己未被唤醒 && 其他人未被唤醒 && 且有等待者 && 成功设置为唤醒状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//设置自己为唤醒状态
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
//非解饿模式,增加占锁状态
if old&mutexStarving == 0 {
new |= mutexLocked
}
//是加锁状态 | 被别人设置为饥饿状态(这里主要是如果被其它的goroutine设置为饥饿,让出优先权,乖乖去排队)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//可以开启饥饿,且锁被占用
if starving && old&mutexLocked != 0 {
//设置饥饿标识
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
//消除唤醒标识
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//没有被锁,也不是饥饿模式,不用去排队
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
//设置下次唤醒竞争多久进入饥饿模式
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
//设置饥饿开关(要么原来就是饥饿模式,要么到点进入饥饿模式)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//饥饿模式
if old&mutexStarving != 0 {
//重复unlock是有可能触发到的
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//已经进入饥饿逻辑,但是不是自己开启的,或者只有自己一个人待获取锁
if !starving || old>>mutexWaiterShift == 1 {
//消除饥饿标识
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
//静态检查,可以忽略
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
func (m *Mutex) Unlock() {
//竞态检查
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
//非饥饿模式走正常逻辑
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
//直接唤醒饥饿中等待的 对应 runtime_SemacquireMutex(&m.sema, true)
runtime_Semrelease(&m.sema, true)
}
}