• Go sync.Mutex互斥锁的学习


    1. 前言

    1.1 基础回顾

    • 原子操作:指那些不能够被打断的操作被称为原子操作,当有一个CPU在访问这块内容addr时,其他CPU就不能访问。
    • CAS:比较及交换,其实也属于原子操作,但它是非阻塞的,所以在被操作值被频繁变更的情况下,CAS操作并不那么容易成功,不得不利用for循环以进行多次尝试。
    • 自旋锁(spinlock):自旋锁是指当一个线程在获取锁的时候,如果锁已经被其他线程获取,那么该线程将循环等待,然后不断地判断是否能够被成功获取,直到获取到锁才会退出循环。获取锁的线程一直处于活跃状态 Golang中的自旋锁用来实现其他类型的锁,与互斥锁类似,不同点在于,它不是通过休眠来使进程阻塞,而是在获得锁之前一直处于活跃状态(自旋)。

    1.2 竞态条件、临界区与同步工具

    我们首先要看的就是sync包。这里的“sync”的中文意思是“同步”。我们下面就从同步讲起。

    相比于 Go 语言宣扬的“用通讯的方式共享数据”,通过共享数据的方式来传递信息和协调线程运行的做法其实更加主流,毕竟大多数的现代编程语言,都是用后一种方式作为并发编程的解决方案的(这种方案的历史非常悠久,恐怕可以追溯到上个世纪多进程编程时代伊始了)。

    一旦数据被多个线程共享,那么就很可能会产生争用和冲突的情况。这种情况也被称为竞态条件(race condition),这往往会破坏共享数据的一致性。

    共享数据的一致性代表着某种约定,即:多个线程对共享数据的操作总是可以达到它们各自预期的效果。

    如果这个一致性得不到保证,那么将会影响到一些线程中代码和流程的正确执行,甚至会造成某种不可预知的错误。这种错误一般都很难发现和定位,排查起来的成本也是非常高的,所以一定要尽量避免。

    案例分析

    同时有多个线程连续向同一个缓冲区写入数据块,如果没有一个机制去协调这些线程的写入操作的话,那么被写入的数据块就很可能会出现错乱。比如,在线程 A 还没有写完一个数据块的时候,线程 B 就开始写入另外一个数据块了。

    显然,这两个数据块中的数据会被混在一起,并且已经很难分清了。因此,在这种情况下,我们就需要采取一些措施来协调它们对缓冲区的修改。这通常就会涉及同步。

    概括来讲,同步的用途有两个:一个是避免多个线程在同一时刻操作同一个数据块,另一个是协调多个线程,以避免它们在同一时刻执行同一个代码块。

    由于这样的数据块和代码块的背后都隐含着一种或多种资源(比如存储资源、计算资源、I/O 资源、网络资源等等),所以我们可以把它们看做是共享资源,或者说共享资源的代表。我们所说的同步其实就是在控制多个线程对共享资源的访问。

    一个线程在想要访问某一个共享资源的时候,需要先申请对该资源的访问权限,并且只有在申请成功之后,访问才能真正开始。

    而当线程对共享资源的访问结束时,它还必须归还对该资源的访问权限,若要再次访问仍需申请。

    这时,我们可以说,多个并发运行的线程对这个共享资源的访问是完全串行的。只要一个代码片段需要实现对共享资源的串行化访问,就可以被视为一个临界区(critical section),也就是我刚刚说的,由于要访问到资源而必须进入的那个区域。

    比如,在我前面举的那个例子中,实现了数据块写入操作的代码就共同组成了一个临界区。如果针对同一个共享资源,这样的代码片段有多个,那么它们就可以被称为相关临界区。

    它们可以是一个内含了共享数据的结构体及其方法,也可以是操作同一块共享数据的多个函数。临界区总是需要受到保护的,否则就会产生竞态条件。施加保护的重要手段之一,就是使用实现了某种同步机制的工具,也称为同步工具。
    在这里插入图片描述
    在 Go 语言中,可供我们选择的同步工具并不少。其中,最重要且最常用的同步工具当属互斥量(mutual exclusion,简称 mutex)。sync包中的Mutex就是与其对应的类型,该类型的值可以被称为互斥量或者互斥锁。

    一个互斥锁可以被用来保护一个临界区或者一组相关临界区。我们可以通过它来保证,在同一时刻只有一个 goroutine 处于该临界区之内。

    为了兑现这个保证,每当有 goroutine 想进入临界区时,都需要先对它进行锁定,并且,每个 goroutine 离开临界区时,都要及时地对它进行解锁。

    锁定操作可以通过调用互斥锁的Lock方法实现,而解锁操作可以调用互斥锁的Unlock方法。以下是文件中重点代码经过简化之后的片段:

    mu.Lock()
    _, err := writer.Write([]byte(data))
    if err != nil {
     log.Printf("error: %s [%d]", err, id)
    }
    mu.Unlock()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    你可能已经看出来了,这里的互斥锁就相当于我们前面说的那块访问令牌。

    2. 夯实基础

    Mutex即我们常说的互斥锁,也称为排他锁。使用互斥锁,可以限定临界区只能同时有一个goroutine持有。当临界区已经有一个goroutine持有的时候,其他goroutine如果想进入此临界区,会等待或者返回失败。直到持有的goroutine退出临界区,等待goroutine中的某一个才有机会进入临界区执行代码。

    2.1 如何用好互斥锁?

    对于如下代码,同时开启两个goroutine执行c.add操作,假设运行运行的机器是多核的,那这两个goroutine很可能是并行执行的,但是在执行c.count++时是串行的,只有获取到锁的goroutine才会执行,另一个会等待。
    在这里插入图片描述

    package main
    
    import (
     "fmt"
     "sync"
    )
    
    type counter struct {
     count int
     sync.Mutex
    }
    
    func (c *counter) add() {
     c.Lock()
     defer c.Unlock()
    
     c.count++
    }
    
    func (c *counter) value() int {
     c.Lock()
     defer c.Unlock()
    
     return c.count
    }
    
    func main() {
     var wg sync.WaitGroup
     var c counter
    
     wg.Add(2)
    
     // goroutine 1
     go func() {
      defer wg.Done()
    
      for i := 0; i < 5000; i++ {
       c.add()
      }
     }()
    
     // goroutine 2
     go func() {
      defer wg.Done()
    
      for i := 0; i < 5000; i++ {
       c.add()
      }
     }()
    
     wg.Wait()
    
     fmt.Println(c.value())
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    我们今天的问题是:我们使用互斥锁时有哪些注意事项?

    1. 不要重复锁定互斥锁;
    2. 不要忘记解锁互斥锁,必要时使用defer语句;
    3. 不要对尚未锁定或者已解锁的互斥锁解锁;
    4. 不要在多个函数之间直接传递互斥锁。

    问题解析

    首先,你还是要把互斥锁看作是针对某一个临界区或某一组相关临界区的唯一访问令牌。

    虽然没有任何强制规定来限制,你用同一个互斥锁保护多个无关的临界区,但是这样做,一定会让你的程序变得很复杂,并且也会明显地增加你的心智负担。

    你要知道,对一个已经被锁定的互斥锁进行锁定,是会立即阻塞当前的 goroutine 的。这个 goroutine 所执行的流程,会一直停滞在调用该互斥锁的Lock方法的那行代码上。

    直到该互斥锁的Unlock方法被调用,并且这里的锁定操作成功完成,后续的代码(也就是临界区中的代码)才会开始执行。这也正是互斥锁能够保护临界区的原因所在。

    一旦,你把一个互斥锁同时用在了多个地方,就必然会有更多的 goroutine 争用这把锁。这不但会让你的程序变慢,还会大大增加死锁(deadlock)的可能性。

    所谓的死锁,指的就是当前程序中的主 goroutine,以及我们启用的那些 goroutine 都已经被阻塞。这些 goroutine 可以被统称为用户级的 goroutine。这就相当于整个程序都已经停滞不前了。

    Go 语言运行时系统是不允许这种情况出现的,只要它发现所有的用户级 goroutine 都处于等待状态,就会自行抛出一个带有如下信息的 panic:

    fatal error: all goroutines are asleep - deadlock!
    
    • 1

    注意,这种由 Go 语言运行时系统自行抛出的 panic 都属于致命错误,都是无法被恢复的,调用recover函数对它们起不到任何作用。也就是说,一旦产生死锁,程序必然崩溃。

    因此,我们一定要尽量避免这种情况的发生。而最简单、有效的方式就是让每一个互斥锁都只保护一个临界区或一组相关临界区。

    在这个前提之下,我们还需要注意,对于同一个 goroutine 而言,既不要重复锁定一个互斥锁,也不要忘记对它进行解锁。

    一个 goroutine 对某一个互斥锁的重复锁定,就意味着它自己锁死了自己。先不说这种做法本身就是错误的,在这种情况下,想让其他的 goroutine 来帮它解锁是非常难以保证其正确性的。

    话说回来,其实我们说“不要忘记解锁互斥锁”的一个很重要的原因就是:避免重复锁定。

    因为在一个 goroutine 执行的流程中,可能会出现诸如“锁定、解锁、再锁定、再解锁”的操作,所以如果我们忘记了中间的解锁操作,那就一定会造成重复锁定。

    除此之外,忘记解锁还会使其他的 goroutine 无法进入到该互斥锁保护的临界区,这轻则会导致一些程序功能的失效,重则会造成死锁和程序崩溃。

    如果一个流程在锁定了某个互斥锁之后分叉了,或者有被中断的可能,那么就应该使用defer语句来对它进行解锁,而且这样的defer语句应该紧跟在锁定操作之后。这是最保险的一种做法。

    忘记解锁导致的问题有时候是比较隐秘的,并不会那么快就暴露出来。这也是我们需要特别关注它的原因。相比之下,解锁未锁定的互斥锁会立即引发 panic。

    并且,与死锁导致的 panic 一样,它们是无法被恢复的。因此,我们总是应该保证,对于每一个锁定操作,都要有且只有一个对应的解锁操作。

    换句话说,我们应该让它们成对出现。这也算是互斥锁的一个很重要的使用原则了。在很多时候,利用defer语句进行解锁可以更容易做到这一点。
    在这里插入图片描述
    最后,可能你已经知道,Go 语言中的互斥锁是开箱即用的。换句话说,一旦我们声明了一个sync.Mutex类型的变量,就可以直接使用它了。

    不过要注意,该类型是一个结构体类型,属于值类型中的一种。把它传给一个函数、将它从函数中返回、把它赋给其他变量、让它进入某个通道都会导致它的副本的产生。

    并且,原值和它的副本,以及多个副本之间都是完全独立的,它们都是不同的互斥锁。

    如果你把一个互斥锁作为参数值传给了一个函数,那么在这个函数中对传入的锁的所有操作,都不会对存在于该函数之外的那个原锁产生任何的影响。

    所以,你在这样做之前,一定要考虑清楚,这种结果是你想要的吗?我想,在大多数情况下应该都不是。即使你真的希望,在这个函数中使用另外一个互斥锁也不要这样做,这主要是为了避免歧义。

    以上这些,就是我想要告诉你的关于互斥锁的锁定、解锁,以及传递方面的知识。这其中还包括了我的一些理解。希望能够对你有用。相关的例子在下面案例中了,你可以去阅读一番,并运行起来看看。

    coder 原文

    package main
    
    import (
    	"bytes"
    	"errors"
    	"fmt"
    	"io"
    	"log"
    	"sync"
    	"time"
    )
    
    // singleHandler 代表单次处理函数的类型。
    type singleHandler func() (data string, n int, err error)
    
    // handlerConfig 代表处理流程配置的类型。
    type handlerConfig struct {
    	handler   singleHandler // 单次处理函数。
    	goNum     int           // 需要启用的goroutine的数量。
    	number    int           // 单个goroutine中的处理次数。
    	interval  time.Duration // 单个goroutine中的处理间隔时间。
    	counter   int           // 数据量计数器,以字节为单位。
    	counterMu sync.Mutex    // 数据量计数器专用的互斥锁。
    
    }
    
    // count 会增加计数器的值,并会返回增加后的计数。
    func (hc *handlerConfig) count(increment int) int {
    	hc.counterMu.Lock()
    	defer hc.counterMu.Unlock()
    	hc.counter += increment
    	return hc.counter
    }
    
    func main() {
    	// mu 代表以下流程要使用的互斥锁。
    	// 在下面的函数中直接使用即可,不要传递。
    	var mu sync.Mutex
    
    	// genWriter 代表的是用于生成写入函数的函数。
    	genWriter := func(writer io.Writer) singleHandler {
    		return func() (data string, n int, err error) {
    			// 准备数据。
    			data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano))
    			// 写入数据。
    			mu.Lock()
    			defer mu.Unlock()
    			n, err = writer.Write([]byte(data))
    			return
    		}
    	}
    
    	// genReader 代表的是用于生成读取函数的函数。
    	genReader := func(reader io.Reader) singleHandler {
    		return func() (data string, n int, err error) {
    			buffer, ok := reader.(*bytes.Buffer)
    			if !ok {
    				err = errors.New("unsupported reader")
    				return
    			}
    			// 读取数据。
    			mu.Lock()
    			defer mu.Unlock()
    			data, err = buffer.ReadString('\t')
    			n = len(data)
    			return
    		}
    	}
    
    	// buffer 代表缓冲区。
    	var buffer bytes.Buffer
    
    	// 数据写入配置。
    	writingConfig := handlerConfig{
    		handler:  genWriter(&buffer),
    		goNum:    5,
    		number:   4,
    		interval: time.Millisecond * 100,
    	}
    	// 数据读取配置。
    	readingConfig := handlerConfig{
    		handler:  genReader(&buffer),
    		goNum:    10,
    		number:   2,
    		interval: time.Millisecond * 100,
    	}
    
    	// sign 代表信号的通道。
    	sign := make(chan struct{}, writingConfig.goNum+readingConfig.goNum)
    
    	// 启用多个goroutine对缓冲区进行多次数据写入。
    	for i := 1; i <= writingConfig.goNum; i++ {
    		go func(i int) {
    			defer func() {
    				sign <- struct{}{}
    			}()
    			for j := 1; j <= writingConfig.number; j++ {
    				time.Sleep(writingConfig.interval)
    				data, n, err := writingConfig.handler()
    				if err != nil {
    					log.Printf("writer [%d-%d]: error: %s",
    						i, j, err)
    					continue
    				}
    				total := writingConfig.count(n)
    				log.Printf("writer [%d-%d]: %s (total: %d)",
    					i, j, data, total)
    			}
    		}(i)
    	}
    
    	// 启用多个goroutine对缓冲区进行多次数据读取。
    	for i := 1; i <= readingConfig.goNum; i++ {
    		go func(i int) {
    			defer func() {
    				sign <- struct{}{}
    			}()
    			for j := 1; j <= readingConfig.number; j++ {
    				time.Sleep(readingConfig.interval)
    				var data string
    				var n int
    				var err error
    				for {
    					data, n, err = readingConfig.handler()
    					if err == nil || err != io.EOF {
    						break
    					}
    					// 如果读比写快(读时会发生EOF错误),那就等一会儿再读。
    					time.Sleep(readingConfig.interval)
    				}
    				if err != nil {
    					log.Printf("reader [%d-%d]: error: %s",
    						i, j, err)
    					continue
    				}
    				total := readingConfig.count(n)
    				log.Printf("reader [%d-%d]: %s (total: %d)",
    					i, j, data, total)
    			}
    		}(i)
    	}
    
    	// signNumber 代表需要接收的信号的数量。
    	signNumber := writingConfig.goNum + readingConfig.goNum
    	// 等待上面启用的所有goroutine的运行全部结束。
    	for j := 0; j < signNumber; j++ {
    		<-sign
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149

    2.2 知识扩展

    1. 读写锁与互斥锁有哪些异同?

    读写锁是读 / 写互斥锁的简称。在 Go 语言中,读写锁由sync.RWMutex类型的值代表。与sync.Mutex类型一样,这个类型也是开箱即用的。

    顾名思义,读写锁是把对共享资源的“读操作”和“写操作”区别对待了。它可以对这两种操作施加不同程度的保护。换句话说,相比于互斥锁,读写锁可以实现更加细腻的访问控制。

    一个读写锁中实际上包含了两个锁,即:读锁和写锁。sync.RWMutex类型中的Lock方法和Unlock方法分别用于对写锁进行锁定和解锁,而它的RLock方法和RUnlock方法则分别用于对读锁进行锁定和解锁。

    另外,对于同一个读写锁来说有如下规则。

    1. 在写锁已被锁定的情况下再试图锁定写锁,会阻塞当前的 goroutine。
    2. 在写锁已被锁定的情况下试图锁定读锁,也会阻塞当前的 goroutine。
    3. 在读锁已被锁定的情况下试图锁定写锁,同样会阻塞当前的 goroutine。
    4. 在读锁已被锁定的情况下再试图锁定读锁,并不会阻塞当前的 goroutine。

    换一个角度来说,对于某个受到读写锁保护的共享资源,多个写操作不能同时进行,写操作和读操作也不能同时进行,但多个读操作却可以同时进行。

    当然了,只有在我们正确使用读写锁的情况下,才能达到这种效果。还是那句话,我们需要让每一个锁都只保护一个临界区,或者一组相关临界区,并以此尽量减少误用的可能性。顺便说一句,我们通常把这种不能同时进行的操作称为互斥操作。

    再来看另一个方面。对写锁进行解锁,会唤醒“所有因试图锁定读锁,而被阻塞的 goroutine”,并且,这通常会使它们都成功完成对读锁的锁定。

    然而,对读锁进行解锁,只会在没有其他读锁锁定的前提下,唤醒“因试图锁定写锁,而被阻塞的 goroutine”;并且,最终只会有一个被唤醒的 goroutine 能够成功完成对写锁的锁定,其他的 goroutine 还要在原处继续等待。至于是哪一个 goroutine,那就要看谁的等待时间最长了。

    除此之外,读写锁对写操作之间的互斥,其实是通过它内含的一个互斥锁实现的。因此,也可以说,Go 语言的读写锁是互斥锁的一种扩展。

    最后,需要强调的是,与互斥锁类似,解锁“读写锁中未被锁定的写锁”,会立即引发 panic,对于其中的读锁也是如此,并且同样是不可恢复的。

    总之,读写锁与互斥锁的不同,都源于它把对共享资源的写操作和读操作区别对待了。这也使得它实现的互斥规则要更复杂一些。

    不过,正因为如此,我们可以使用它对共享资源的操作,实行更加细腻的控制。另外,由于这里的读写锁是互斥锁的一种扩展,所以在有些方面它还是沿用了互斥锁的行为模式。比如,在解锁未锁定的写锁或读锁时的表现,又比如,对写操作之间互斥的实现方式。

    3. 应用实践

    4. 源码学习

    4.1 Mutex实现

    Mutex实现使用了CAS+自旋操作+信号量技术,通过正常模式和饥饿模式兼顾公平和性能。在正常模式下,主打性能,被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换。在饥饿模式下,保证等待时间最久的goroutine在锁被释放时优先执行,保证goroutine不会因等锁而饿死。

    Go 语言的 sync.Mutex 由两个字段state和sema组成。其中state表示当前互斥锁的状态,而 sema是信号量,用于控制goroutine的阻塞与唤醒。

    1. Mutex结构

    type Mutex struct {
        state int32  // 表示锁当前的状态
        sema  uint32 // 信号量 用于向处于Gwaitting的G发送信号
    }
    const (
     // state的第一个bit位,表示是否加锁
     mutexLocked = 1 << iota // mutex is locked
     // state的第二个bit位,表示是否被唤醒
     mutexWoken
     // state的第三个bit位,表示是否处于饥饿模式
     mutexStarving
     // state的[4,32]bit位,表示等待锁的goroutine的数量
     mutexWaiterShift = iota
     // 饥饿时间1毫秒
     starvationThresholdNs = 1e6
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • state表示mutex的状态,是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态值
      在这里插入图片描述

    2. 状态值

    • mutexLocked
      值为 1,第一位为 1,表示 mutex 已经被加锁。根据 mutex.state & mutexLocked 的结果来判断 mutex 的状态:该位为 1 表示已加锁,0 表示未加锁。
    • mutexWoken
      值为 2,第二位为 1,表示 mutex 是否被唤醒。根据 mutex.state & mutexWoken 的结果判断 mutex 是否被唤醒:该位为 1 表示已被唤醒,0 表示未被唤醒。
    • mutexStarving
      值为 4,第三位为 1,表示 mutex 是否处于饥饿模式。根据 mutex.state & mutexWoken 的结果判断 mutex 是否处于饥饿模式:该位为 1 表示处于饥饿模式,0 表示正常模式。
    • mutexWaiterShift
      值为 3,表示 mutex.state 右移 3 位后即为等待的 goroutine 的数量。
    • starvationThresholdNs
      值为 1000000 纳秒,即 1ms,表示将 mutex 切换到饥饿模式的等待时间阈值。

    3. 工作模式

    • 正常模式下
      等待者以 FIFO 的顺序排队来获取锁,但被唤醒的等待者发现并没有获取到 mutex,并且还要与新到达的 goroutine 们竞争 mutex 的所有权。新到达的 goroutine 们有一个优势 —— 它们已经运行在 CPU 上且可能数量很多,所以一个醒来的等待者有很大可能会获取不到锁。在这种情况下它处在等待队列的前面。如果一个 goroutine 等待 mutex 释放的时间超过 1ms,它就会将 mutex 切换到饥饿模式。
    • 在饥饿模式下
      mutex 的所有权直接从对 mutex 执行解锁的 goroutine 传递给等待队列前面的等待者。新到达的 goroutine 们不要尝试去获取 mutex,即使它看起来是在解锁状态,也不要试图自旋(等也白等,在饥饿模式下是不会给你的),而是自己乖乖到等待队列的尾部排队去。

    何时退出饥饿模式

    如果某waiter获取到了锁,并且满足以下两个条件之一,它就会将锁从饥饿模式切换回正常模式。

    • 它是waiter队列中最后一个Goroutine
    • 它等待获取锁的时间 < 1ms

    正常模式有更好地性能,因为一个 goroutine 可以连续获得好几次 mutex,即使有阻塞的等待者。而饥饿模式可以有效防止出现位于等待队列尾部的等待者一直无法获取到 mutex 的情况。

    性能对比

    • 正常模式:有更好的性能,让请求者自旋来抢锁的模式,能够有更高的吞吐量,goroutine可以连续多次获得mutex锁,频繁的唤醒、挂起goroutine会带来比较大的开销;
    • 饥饿模式:饥饿模式所有goroutine不再自旋,所有goroutine都要严格去排队唤醒,能阻止尾部延迟的现象,可以预防队列尾部goroutine一致无法获取mutex锁的问题。

    4. 锁的三种处理方式

    • Barging方式
      这种模式追求最高的吞吐量:当锁被释放时,它将唤醒第一个waiter(等待锁的Goroutine),并将锁给第一个请求锁的Goroutine或这个waiter
    • Handoff方式
      锁释放后,互斥锁仍将保持锁定,直到第一个waiter准备好。它会降低吞吐量, 因为即使另一个goroutine准备好获取锁,也不会把锁移交给它。
    • Spinning方式
      自旋锁

    4.2 整体流程在这里插入图片描述

    1. waiter队列和处理器P的本地队列中的Goroutine并发抢占锁
    2. 没有获取到锁的Goroutine会根据情况选择进入自旋状态
    3. 自旋结束后,如果锁是饥饿状态,则当前Goroutine直接进入waiter队列。如果是正常状态,再次尝试CAS加锁
    4. 如果CAS加锁再次失败,进入waiter队列
    5. 从waiter队列醒来的Goroutine将再次尝试获取锁,如果锁处于饥饿模式,则使用handoff方式确保waiter拿到锁

    4.3 深入分析Lock&Unlock

    1. 接口

    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
        Lock()
        Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    理想路径是使用CAS函数将mutexLocked从 0 改成 1,这就是所谓的 fast path
    如果失败了,就会进入 slow path,下面我们看看源码实现~

    2. 深入分析Lock

    (1)简单加锁—fastpath

    当mutexLocked = 0 时,使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新 mutexLocked = 1,

    func (m *Mutex) Lock() {
    	// fast path
    	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    		return
    	}
    	// slow path
    	m.lockSlow()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    假定当前只有一个协程在加锁,没有其他协程干扰,那么过程如下图所示:
    在这里插入图片描述
    加锁过程会去判断Locked标志位是否为0,如果是0则把Locked位置1,代表加锁成功。从上图可见,加锁成功后,只是Locked位置1,其他状态位没发生变化。

    (2)加锁被阻塞—slowpath

    Mutex.lockSlow函数的主体是一个大for循环,大概可以分为三部分:自旋,计算状态和上锁(更新状态)。

    1. 自旋:如果互斥锁的状态不是 0 时,说明锁被占用了。当前Goroutine没获取到锁,这时会先尝试自旋(Spinnig):
    old := m.state
    // 判断是否能进入自旋
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // 判断当前goroutine是不是在唤醒状态
        // 尝试将当前锁的Woken状态设置为1,表示已被唤醒(这块属于锁的细节,不理解的同学可以忽略)
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        // 自旋
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    自旋是一种多线程同步机制,当前的进程在自旋过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。

    在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益。
    但是随意使用自旋也有可能拖累CPU,Golang作者认为应该保守的引入自旋,所以 Goroutine 进入自旋的条件非常苛刻:持有锁的goroutine能在较短的时间内归还锁时,才允许它自旋

    互斥锁只有在普通模式才能进入自旋
    这个条件对应源码的 old&(mutexLocked|mutexStarving) == mutexLocked

    runtime_canSpin返回值为true才能进入自旋
    runtime_canSpin方法判断自旋是否有意义(说白了就是自旋是否能获得性能提升)

    进入自旋的前提条件:

    1. 自旋次数 < 4
    2. 必须是多核CPU 且 GOMAXPROCS>1
    3. 至少有一个其他的正在运行的P 并且本地运行队列为空(不理解P的同学可以Google下GMP模型)
    //go:linkname sync_runtime_canSpin sync.runtime_canSpin
    //go:nosplit
    func sync_runtime_canSpin(i int) bool {
    	// 自旋次数 < 4
    	// 必须是多核CPU 且 GOMAXPROCS>1
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
    		return false
    	}
    	// 至少有一个其他的正在运行的P 并且本地运行队列为空
    	if p := getg().m.p.ptr(); !runqempty(p) {
    		return false
    	}
    	return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    一旦进入自旋,就会调用sync.runtime_doSpin和 runtime·procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

    const (
    	active_spin_cnt = 30
    )
    
    //go:linkname sync_runtime_doSpin sync.runtime_doSpin
    //go:nosplit
    func sync_runtime_doSpin() {
    	procyield(active_spin_cnt)
    }
    
    TEXT runtime·procyield(SB),NOSPLIT,$0-0
    	MOVL	cycles+0(FP), AX
    again:
    	PAUSE
    	SUBL	$1, AX
    	JNZ	again
    	RET
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 更新状态:这部分内容都是互斥锁的一些细节,大概看一下就可以。比较重要的内容是:如果当前申请锁的Goroutine发现锁的已经是饥饿状态,就会直接去队尾排队
    // old是锁当前的状态,new是期望的状态,以期于在后面的CAS操作中更改锁的状态
    new := old
    // 如果当前锁不是饥饿模式,则将new的低1位的Locked状态位设置为1,表示加锁
    if old&mutexStarving == 0 {
        new |= mutexLocked
    }
    // 如果当前锁已被加锁或者处于饥饿模式,则将waiter数加1,表示申请上锁的Goroutine直接滚去排队
    if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
    }
    // 当 Mutex是锁定状态,且申请上锁的Goroutine饥饿,则标记为饥饿状态
    if starving && old&mutexLocked != 0 {
        new |= mutexStarving
    }
    // 当awoke为true,修改锁的Woken状态位为1
    if awoke {
        if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
        }
        new &^= mutexWoken
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 上锁: 使用 CAS 函数 atomic.CompareAndSwapInt32 更新状态,如果失败则进入waiter队列:
      调用 runtime.sync_runtime_SemacquireMutex 休眠当前goroutine并且尝试获取信号量。

    运行到 SemacquireMutex就证明当前goroutine在前面的过程中获取锁失败了。此时需要sleep原语来阻塞当前goroutine,并通过信号量来排队获取锁

    一旦当前 Goroutine 获取到信号量被唤醒,它就会接着跑剩下的代码:

    • 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
    • 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 如果锁的原状态是锁定状态或饥饿状态,则当前Goroutine上锁失败
        if old&(mutexLocked|mutexStarving) == 0 {
            break 
        }
        // 这里判断waitStartTime != 0就证明当前goroutine之前已经等待过了,则需要将其放置在等待队列队头
        queueLifo := waitStartTime != 0
        if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
        }
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
        // 设置饥饿状态
        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
        old = m.state
        // 饥饿模式直接获取到锁
        if old&mutexStarving != 0 {
            // 校验锁状态
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
            }
            // 退出饥饿模式
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            if !starving || old>>mutexWaiterShift == 1 {
                delta -= mutexStarving
            }
            atomic.AddInt32(&m.state, delta)
            break
        }
        // 重置唤醒状态和自旋次数
        awoke = true
        iter = 0
    } else {
        // 如果CAS未成功,更新锁状态,重新一个大循环
        old = m.state
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    假定加锁时,锁已被其他协程占用了,此时加锁过程如下图所示:
    在这里插入图片描述
    从上图可看到,当协程B对一个已被占用的锁再次加锁时,Waiter计数器增加了1,此时协程B将被阻塞,直到Locked值变为0后才会被唤醒。

    3. 深入分析UnLock

    互斥锁的UnLock过程会先使用 sync/atomic.AddInt32 函数快速解锁。

    func (m *Mutex) Unlock() {
    	new := atomic.AddInt32(&m.state, -mutexLocked)
    	if new != 0 {
    		m.unlockSlow(new)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (1)简单解锁—slowpath

    • 在正常模式下,会根据锁的状态唤醒waiter并移交锁的所有权;
    • 在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 将当前锁交给waiiter
    func (m *Mutex) unlockSlow(new int32) {
    	// 校验锁状态
    	if (new+mutexLocked)&mutexLocked == 0 {
    		throw("sync: unlock of unlocked mutex")
    	}
    	if new&mutexStarving == 0 { // 正常模式
    		old := new
    		for {
    			// 如果锁没有waiter,或者锁有其他以下已发生的情况之一,则后面的工作就不用做了,直接返回
    			// 1. 锁处于锁定状态,表示锁已经被其他goroutine获取了
    			// 2. 锁处于被唤醒状态,这表明有等待goroutine被唤醒,不用再尝试唤醒其他goroutine
    			// 3. 锁处于饥饿模式,那么锁之后会被直接交给等待队列队头goroutine
    			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    				return
    			}
    			// 代码走到这,说明当前锁是空闲状态,等待队列中有waiter,且没有goroutine被唤醒
    			// 所以,这里我们想要把锁的状态设置为被唤醒,等待队列waiter数-1
    			new = (old - 1<<mutexWaiterShift) | mutexWoken
    			if atomic.CompareAndSwapInt32(&m.state, old, new) {
    				// 通过信号量唤醒goroutine,然后退出
    				runtime_Semrelease(&m.sema, false, 1)
    				return
    			}
    			old = m.state
    		}
    	} else { // 饥饿模式
    		// 直接唤醒等待队列队头goroutine
    		runtime_Semrelease(&m.sema, true, 1)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    假定解锁时,没有其他协程阻塞,此时解锁过程如下图所示:
    在这里插入图片描述
    由于没有其他协程阻塞等待加锁,所以此时解锁时只需要把Locked位置为0即可,不需要释放信号量。

    (4)解锁并唤醒协程
    假定解锁时,有1个或多个协程阻塞,此时解锁过程如下图所示:
    在这里插入图片描述
    协程A解锁过程分为两个步骤,一是把Locked位置0,二是查看到Waiter>0,所以释放一个信号量,唤醒一个阻塞的协程,被唤醒的协程B把Locked位置1,于是协程B获得锁。

    注意事项

    1.重复解锁会引起panic,应避免这种操作的可能性。
    2.加锁和解锁最好出现在同一个层次的代码块中,比如同一个函数。
    3.加锁后立即使用defer对其解锁,可以有效的避免死锁。
    4.尝试去unlock一把空闲的mutex,会导致panic。
    5.不要复制Mutex,mutex做函数参数时,传参时使用指针。

    5. 总结

    相关资料

    1. 腾讯云—互斥锁的实现
    2. 深入分析 go 的 mutex锁机制
    3. 一份详细的注释的go mytex源码
  • 相关阅读:
    《剑指offer第二版》面试题14:剪绳子
    (附源码)ssm市级疫情防控管理 毕业设计 030957
    一文讲透消息队列RocketMQ实现消费幂等
    Vite 是否可以代替 Webpack ?
    GIT分布式版本控制系统 | 命令讲解入门
    香菇多糖-四甲基罗丹明 Lentinan-TRITC 四甲基罗丹明-PEG-香菇多糖
    java中的垃圾回收算法与垃圾回收器
    Switch Port Hybrid Mode
    tensor和numpy相互转换
    关于对随机森林接口predict_proba()的个人理解
  • 原文地址:https://blog.csdn.net/qq_41893274/article/details/127704620