func (m *Mutex) TryLock() bool {
old := m.state
// 如果被锁或者进入饥饿模式直接放弃
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
//竞争锁失败
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)
type Mutex struct {
sync.Mutex
}
//获取goroutine数
func (m *Mutex) GetGoroutineNumber() int {
//由于state是sync.Mutex的第一个属性,所以可以通过 unsafe.Pointer(&m.Mutex) 获取
val := atomic.LoadUint32((*uint32)(unsafe.Pointer(&m.Mutex)))
return int(val&mutexLocked + val>>mutexWaiterShift)
}
var m Mutex
func main() {
for i := 0; i < 10; i++ {
go func() {
m.Lock()
time.Sleep(2 * time.Second)
m.Unlock()
}()
}
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println(m.GetGoroutineNumber())
}
}
}()
time.Sleep(30 * time.Second)
}
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
func (rw *RWMutex) RLock() {
//竞态忽略
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//如果当前的reader等待数 +1 < 0,说明有写操作需要获取锁,阻塞读,等待唤醒
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
//竞态忽略
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
func (rw *RWMutex) RUnlock() {
//竞态忽略
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
//有写等待
rw.rUnlockSlow(r)
}
//竞态忽略
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
//重复解锁情况下
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
//如果写之前的读都完成了。那么写可以开始干活了
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
func (rw *RWMutex) Lock() {
//竞态忽略
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//写锁复用的sync.Mutex
rw.w.Lock()
//变成负的来表示写操作要入场了
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
//读还在占用锁,写还是需要等待,维护写操作需要等待的读操作数量(readerWait)
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
//竞态忽略
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
func (rw *RWMutex) Unlock() {
//竞态忽略
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
//重复解锁
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
//把写期间的goroutine给他调用了
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
//竞态忽略
if race.Enabled {
race.Enable()
}
}
读写锁的的写锁是基于 sync.Mutex
package main
import (
"sync"
)
var s sync.RWMutex
func main() {
s.Lock()
s.Lock()
}
在 Rlock 后面的 Lock 会阻塞等待 RUnlock,而 RUnlock又被 Lock阻塞,故此死锁
s.RLock()
s.Lock()
s.RUnlock()
s.Unlock()
上面就是下面代码死锁流程
package main
import (
"fmt"
"sync"
"time"
)
var s sync.RWMutex
var w sync.WaitGroup
func main() {
w.Add(3)
go func() {
s.RLock()
time.Sleep(2 * time.Second)
s.RLock()
w.Done()
s.RUnlock()
w.Done()
s.RUnlock()
}()
go func() {
time.Sleep(1 * time.Second)
s.Lock()
w.Done()
s.Unlock()
}()
w.Wait()
fmt.Println("凉凉")
}