• golang同步原语——sync.Mutex


    本文是golang同步原语sync.Mutex的较为详细的使用指南,涵盖发展历程、错误使用场景以及基于标准库sync.Mutex的一些扩展用法的实现细节。

    📡 在正式学习之前先来了解一下什么是同步原语
    同步原语指互斥锁 Mutex、读写锁 RWMutex、并发编排WaitGroup、条件变量 Cond、Channel 等。应用场景如下:
    ● 共享资源:并发地读写共享资源,会出现数据竞争(data race)的问题,所以需要Mutex、RWMutex 这样的并发原语来保护。
    ● 任务编排:需要 goroutine 按照一定的规律执行,而 goroutine 之间有相互等待或者依赖的顺序关系,我们常常使用 WaitGroup 或者 Channel 来实现。
    ● 消息传递:信息交流以及不同的 goroutine 之间的线程安全的数据交流,常常使用 Channel 来实现。

    💌 sync.Mutex

    sync.Locker接口是一个最基础的接口,sync.Mutex是该接口的一种实现,其实现如下:

    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
    	Lock()
    	Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在正式开始了解其具体实现的时候,需要知道以下两个基本概念:
    CAS 指令:将给定的值和一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。
    原子性:原子性保证这个指令总是基于最新的值进行计算,如果同时有其它线程已经修改了这个值,那么,CAS 会返回失败。

    1. 发展历程

    初版的 Mutex 使用一个 flag 来表示锁是否被持有,实现比较简单;后来照顾到新来的 goroutine,所以会让新的 goroutine 也尽可能地先获取到锁,这是第二个阶段,我把它叫作给新人机会;那么,接下来就是第三阶段多给些机会,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段解决饥饿。下面是较为详细的解释:

    1.1 初版mutex

    用 CAS 原子操作,对 key 标志量进行设置。key 不仅仅标识了锁是否被 goroutine 所持有,还记录了当前持有和等待获取锁的 goroutine 的数量。
    注意Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今。所以在使用 Mutex 的时候,一定要遵循“谁申请,谁释放”的原则。

    1.2 给新人机会

    相对于初版的设计,这次的主要改动是新来的 goroutine 也有机会先获取到锁,甚至一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑;此外还采取 atomic 包的同步原语执行原子操作替换原cas语句等。

    1.3 多给些机会

    在 2015 年 2 月的改动中,如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

    1.4 解决饥饿

    新来的 goroutine 和等待中的 goroutine 同时竞争 mutex,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题
    当前版本golang sync.Mutex相关源码:sync.Mutex.Locksync.Mutex.Unlock .
    跟之前的实现相比,当前的 Mutex 最重要的变化,就是增加饥饿模式。将饥饿模式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,新来的同学主动谦让一下,给老同志一些机会。通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的goroutine 获取锁的公平性,对于我们使用锁的业务代码来说,不会有业务一直等待锁不被处理。
    饥饿模式和正常模式详解
    在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么这个 Mutex 就进入到了饥饿模式。
    在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加入到等待队列的尾部。如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:
    ● 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
    ● 此 waiter 的等待时间小于 1 毫秒。
    饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

    2. 常见的4种错误使用场景

    2.1 Lock/Unlock未成对出现

    此类错误较为常见,在诸多有名的开源项目中都有出现,如Docker、Kubernetes。

    2.2 拷贝已使用的Mutex

    因为sync.Mutex是有状态的锁,所以禁止拷贝使用,可以在持续集成中使用go vet工具检查是否存在此类缺陷。

    2.3 当作重入锁使用

    Mutex 不是可重入的锁,一旦误用 Mutex 的重入,就会导致报错。
    可重入锁指的是当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,把这类锁叫可重入锁(有时候也叫做递归锁)。
    实现可重入锁的方案:
    方案一:通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口。
    方案二:调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来就不满足Locker 接口了。

    2.4 造成死锁

    想避免死锁,只要破坏造成死锁的四个必要条件中的一个或者几个,就可以了:
    ● 互斥:至少一个资源是被排他性独享的,其他线程必须处于等待状态,直到资源被释放。
    ● 持有和等待:goroutine 持有一个资源,并且还在请求其它 goroutine 持有的资源,也就是咱们常说的“吃着碗里,看着锅里”的意思。
    ● 不可剥夺:资源只能由持有它的 goroutine 来释放。
    ● 环路等待:一般来说,存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。

    3. 可重入锁的实现

    上文2.3提到了可重入锁的概念,本节介绍基于sync.Mutex的可重入锁的实现。

    3.1 方案一

    通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    
    	"github.com/petermattis/goid"
    )
    
    //RecursiveMutex包装一个Mutex,实现可重入
    type RecursiveMutex struct {
    	sync.Mutex
    	owner     int64 //当前持有锁的goroutine id
    	recursion int32 // 当前goroutine 重入的次数
    }
    
    func (m *RecursiveMutex) Lock() {
    	gid := goid.Get() // 获取到当前goroutine的id
    	//如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
    	if atomic.LoadInt64(&m.owner) == gid {
    		m.recursion++
    		return
    	}
    	m.Mutex.Lock()
    	// 获得锁的goroutine第一次调用,记录下它的goroutine id
    	atomic.StoreInt64(&m.owner, gid)
    	m.recursion = 1
    }
    
    func (m *RecursiveMutex) Unlock() {
    	gid := goid.Get()
    	//非持有锁的goroutine尝试释放锁,错误的使用
    	if atomic.LoadInt64(&m.owner) != gid {
    		panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    	}
    	m.recursion--
    	if m.recursion != 0 {
    		return
    	}
    	// 此goroutine最后一次调用,需要释放锁
    	atomic.StoreInt64(&m.owner, -1)
    	m.Mutex.Unlock()
    }
    
    func main() {
    	str := "Hello World!"
    	mtx := RecursiveMutex{}
    	mtx.Lock()
    	fmt.Println("first lock: ", str)
    	mtx.Lock()
    	fmt.Println("second lock: ", str)
    	mtx.Unlock()
    	mtx.Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    3.2 方案二

    由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    )
    
    //RecursiveMutex包装一个Mutex,实现可重入
    type RecursiveMutex struct {
    	sync.Mutex
    	token     int64 //当前持有锁的goroutine token
    	recursion int32 // 当前goroutine 重入的次数
    }
    
    func (m *RecursiveMutex) Lock(token int64) {
    	if atomic.LoadInt64(&m.token) == token {
    		m.recursion++
    		return
    	}
    	m.Mutex.Lock()
    	atomic.StoreInt64(&m.token, token)
    	m.recursion = 1
    }
    
    func (m *RecursiveMutex) Unlock(token int64) {
    	//非持有锁的goroutine尝试释放锁,错误的使用
    	if atomic.LoadInt64(&m.token) != token {
    		panic(fmt.Sprintf("wrong the token(%d): %d!", m.token, token))
    	}
    	m.recursion--
    	if m.recursion != 0 {
    		return
    	}
    	// 此goroutine最后一次调用,需要释放锁
    	atomic.StoreInt64(&m.token, 0)
    	m.Mutex.Unlock()
    }
    
    func main() {
    	str := "Hello World!"
    	mtx := RecursiveMutex{}
    	mtx.Lock(867666)
    	fmt.Println("first lock: ", str)
    	mtx.Lock(867666)
    	fmt.Println("second lock: ", str)
    	mtx.Unlock(867666)
    	mtx.Unlock(867666)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    4. 拓展额外功能

    有时希望通过标准库的mutex来实现一些额外的扩展功能,如监控锁的竞争情况、获取不到锁直接返回的trylock场景等。下面介绍几种常见的扩展场景及实现方法:

    4.1 TryLock

    该方法的含义是当一个 goroutine 调用这个TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回false,不会阻塞在方法调用上。
    该方法在go1.18之前未实现,go1.18之后添加了TryLock方法实现。

    4.2 SpinLock实现

    github.com/tryturned/go-utils/sync/spinlock.go

    4.3 获取当前mutex等待者的数量

    需要对标准库里面sync.Mutex进行扩展开发,详见: github.com/tryturned/go-utils/sync/mutex.go#L22

  • 相关阅读:
    边缘计算如何与小程序结合?智能家居如何借势发展?
    AndroidStudio最下方显示不出来Terminal等插件
    代码随想录算法训练营第十天|二叉树完结
    【C】指针进阶(上)
    vue本地开发设置代理连接本地后台服务
    Spark Streaming状态管理函数
    腾讯云便宜服务器有哪些?腾讯云这个服务器一个月7块钱!
    [MYSQL索引优化] 分页查询优化
    webrtc学习--websocket服务器(二) (web端播放h264)
    喜报|Authing 入选 CNCF Landscape 云原生技术图谱
  • 原文地址:https://blog.csdn.net/qq_41345173/article/details/126254409