• Go并发编程基础


    相关链接:
    https://draveness.me/golang/docs/
    https://www.liwenzhou.com/posts/Go/concurrence/

    并发编程在当前软件领域是一个非常重要的概念,随着CPU等硬件的发展,我们无一例外的想让我们的程序运行的快一点、再快一点。Go语言在语言层面天生支持并发,充分利用现代CPU的多核优势,这也是Go语言能够大范围流行的一个很重要的原因。

    Go语言中的并发程序主要是通过基于CSP(communicating sequential processes)的goroutine和channel来实现。

    一、Context上下文

    Go1.7 加入了一个新的标准库context,它定义了Context类型,专门用来简化对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

    对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancelWithDeadlineWithTimeoutWithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。

    • context.Context 接口
    type Context interface {
        Deadline() (deadline time.Time, ok bool) //返回当前Context被取消的时间
        Done() <-chan struct{} //返回一个Channel,这个Channel会在当前工作完成或者上下文被取消之后关闭
        Err() error // 返回当前Context结束的原因。eg:Canceled错误、DeadlineExceeded错误
        Value(key interface{}) interface{} //从Context中返回键对应的值
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Context使用注意事项

      • 推荐以参数的方式显示传递Context。
      • 以Context作为参数的函数方法,应该把Context作为第一个参数。
      • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO()。
      • Context的Value相关方法应该传递请求域的必要数据,不应该用于传递可选参数。
      • Context是线程安全的,可以放心的在多个goroutine中传递。
    • context.Context 编程实战

    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func main() {
    	// ------------------------引入:为什么需要context------------------
    	//在 Goroutine 构成的树形结构中同步取消信号以减少计算资源的浪费是 context.Context 的最大作用。
    	//我们可能会创建多个 Goroutine 来处理一次请求,而 context.Context 的作用是在不同 Goroutine 之间
    	//同步请求特定数据、取消信号以及处理请求的截止日期。
    	//wg.Add(1)
    	//go worker()
    	 如何优雅的实现结束子goroutine
    	//wg.Wait()
    	//fmt.Println("over")
    
    	// ---------------------------context.Context-------------------------
    	//理解 context.Context 的使用方法和设计原理 — 多个 Goroutine 同时订阅 ctx.Done() 管道中的消息,
    	//一旦接收到取消信号就立刻停止当前正在执行的工作。
    	//从源代码来看,context.Background 和 context.TODO 也只是互为别名,没有太大的差别,只是在使用和语义上稍有不同:
    	//context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来;
    	//context.TODO 应该仅在不确定应该使用哪种上下文时使用;
    	//在多数情况下,我们都会使用 context.Background 作为起始的上下文向下传递。
    
    	// ---------------------------WithCancel【最常用】-------------------------
    	//context.WithCancel 函数能够从 context.Context 中衍生出一个新的子上下文并返回用于取消该上下文的函数。
    	//一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到这一取消信号。
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel() // 当我们取完需要的整数后调用cancel
    	for n := range gen(ctx) {
    		fmt.Println(n)
    		if n == 5 {
    			break //当前上下文break,执行defer cancel()取消子上下文ctx
    		}
    	}
    
    	// ---------------------------WithDeadline-------------------------
    	//d := time.Now().Add(50 * time.Millisecond)
    	d := time.Now().Add(5000 * time.Millisecond)
    	//ctx, cancel := context.WithDeadline(context.Background(), d)
    	 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
    	 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    	//defer cancel()
    	//select {
    	//case <-time.After(1 * time.Second):
    	//	fmt.Println("overslept")
    	//case <-ctx.Done():
    	//	fmt.Println(ctx.Err())
    	//}
    
    	// ---------------------------WithTimeout-------------------------
    	// 设置一个50毫秒的超时
    	//ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
    	//wg.Add(1)
    	//go worker1(ctx)
    	//time.Sleep(time.Second * 5)
    	//cancel() // 通知子goroutine结束
    	//wg.Wait()
    	//fmt.Println("over")
    
    	// ---------------------------WithValue-------------------------
    	// 设置一个50毫秒的超时
    	//ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
    	 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合
    	//ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")
    	//wg.Add(1)
    	//go worker3(ctx)
    	//time.Sleep(time.Second * 5)
    	//cancel() // 通知子goroutine结束
    	//wg.Wait()
    	//fmt.Println("over")
    
    }
    
    func worker() {
    	for {
    		fmt.Println("worker")
    		time.Sleep(time.Second)
    	}
    	// 如何接收外部命令实现退出
    	wg.Done()
    }
    
    func worker1(ctx context.Context) {
    LOOP:
    	for {
    		fmt.Println("db connecting ...")
    		time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
    		select {
    		case <-ctx.Done(): // 50毫秒后自动调用
    			break LOOP
    		default:
    		}
    	}
    	fmt.Println("worker done!")
    	wg.Done()
    }
    
    // context.WithValue
    
    type TraceCode string
    
    func worker3(ctx context.Context) {
    	key := TraceCode("TRACE_CODE")
    	traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace code
    	if !ok {
    		fmt.Println("invalid trace code")
    	}
    LOOP:
    	for {
    		fmt.Printf("worker, trace code:%s\n", traceCode)
    		time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
    		select {
    		case <-ctx.Done(): // 50毫秒后自动调用
    			break LOOP
    		default:
    		}
    	}
    	fmt.Println("worker done!")
    	wg.Done()
    }
    
    func gen(ctx context.Context) <-chan int {
    	// 通过channel通信
    	dst := make(chan int)
    	n := 1
    	go func() {
    		for {
    			select {
    			case <-ctx.Done(): // 子上下文获取取消信息:ctx.Done(),程序结束
    				fmt.Println("当前语句并不会执行")
    				return // return结束该goroutine,防止泄露
    			case dst <- n:
    				n++
    			}
    		}
    	}()
    	return dst
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    二、Goroutine

    Golang高级数据结构:https://blog.csdn.net/qq_41822345/article/details/125475150

    Go GMP模型:https://blog.csdn.net/qq_41822345/article/details/123015441

    三、channel

    Golang高级数据结构:https://blog.csdn.net/qq_41822345/article/details/125475150

    • 无锁管道

    锁是一种常见的并发控制技术,我们一般会将锁分成乐观锁和悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,很多人都会误以为乐观锁是与悲观锁差不多,然而它并不是真正的锁,只是一种并发控制的思想。

    • 乐观并发控制

    乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。

    • 无锁channel的实现方案

    该方案将 Channel 分成了以下三种类型:
    1、同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
    2、异步 Channel — 基于环形缓存的传统生产者消费者模型;
    3、chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;

    这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。另外因为目前通过 CAS 实现的无锁 Channel 没有提供先进先出的特性,所以该提案暂时也被搁浅。

    四、select

    select 是操作系统中的系统调用,我们经常会使用 selectpollepoll 等函数构建 I/O 多路复用模型提升程序的性能。Go 语言的 select 与操作系统中的 select 比较相似。

    Go 语言中的 select 也能够让 Goroutine 同时等待多个 Channel 可读或者可写,在多个文件或者 Channel状态改变之前,select 会一直阻塞当前线程或者 Goroutine。

    select 是与 switch 相似的控制结构,与 switch 不同的是,select 中虽然也有多个 case,但是这些 case 中的表达式必须都是 Channel 的收发操作。 当select 中有多个 case被触发时,会随机执行其中一个。

    非阻塞channel

    运行下面的代码时就不会阻塞当前的 Goroutine,它会直接执行 default 中的代码。

    select 的作用是同时监听多个 case 是否可以执行,如果多个 Channel 都不能执行,那么运行 default 也是理所当然的。

    func main() {
    	ch := make(chan int)
    	select {
    	case i := <-ch:
    		println(i)
    	default:
    		println("default")
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    实现原理

    select 在 Go 语言的源代码中不存在对应的结构体,但是我们使用 runtime.scase 结构体表示 select 控制结构中的 case

    type scase struct {
    	c    *hchan         // chan
    	elem unsafe.Pointer // data element
    }
    
    • 1
    • 2
    • 3
    • 4

    因为非默认的 case 中都与 Channel 的发送和接收有关,所以 runtime.scase 结构体中也包含一个 runtime.hchan 类型的字段存储 case 中使用的 Channel。

    select 语句在编译期间会被转换成 OSELECT 节点。每个 OSELECT 节点都会持有一组 OCASE 节点,如果 OCASE 的执行条件是空,那就意味着这是一个 default 节点。编译器在中间代码生成期间会根据 selectcase 的不同对控制语句进行优化,我们在这里会分四种情况介绍处理的过程和结果:

    1. select 不存在任何的 case

      空的 select 语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。

      //直接将类似 select {} 的语句转换成调用 runtime.block 函数 
      func block() {
      	gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
      }
      
      • 1
      • 2
      • 3
      • 4
    2. select 只存在一个 case

      如果当前的 select 条件只包含一个 case,那么编译器会将 select 改写成 if 条件语句。

      // 改写前
      select {
      case v, ok <-ch: // case ch <- v
          ...    
      }
      
      // 改写后
      if ch == nil {
          block()  //当 case 中的 Channel 是空指针时,会直接挂起当前 Goroutine 并陷入永久休眠。
      }
      v, ok := <-ch // case ch <- v
      ...
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    3. select 存在两个 case,其中一个 casedefault

      select 中仅包含两个 case,并且其中一个是 default 时,Go 语言的编译器就会认为这是一次非阻塞的收发操作。针对非阻塞发送和非阻塞接收时,编译器进行的不同优化。

      • 发送
      // 改写前
      select {
      case ch <- i:
          ...
      default:
          ...
      }
      
      // 改写后
      if selectnbsend(ch, i) {
          ...
      } else {
          ...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 接收
      // 改写前
      select {
      case v <- ch: // case v, ok <- ch:
          ......
      default:
          ......
      }
      
      // 改写后
      if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
          ...
      } else {
          ...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    4. select 存在多个 case

      在默认情况下会通过 runtime.selectgo 获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码。

    select总结

    在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 runtime.selectgo 函数,该函数会按照以下的流程执行:

    1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder
    2. 根据pollOrder遍历所有的case查看是否有可以立刻处理的 Channel;
      1. 如果存在,直接获取 case 对应的索引并返回;
      2. 如果不存在,创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
    3. 当调度器唤醒当前 Goroutine 时,会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 对应的索引;

    select 关键字是 Go 语言特有的控制结构,它的实现原理比较复杂,需要编译器和运行时函数的通力合作。

    五、调度器

    Go 调度器:https://blog.csdn.net/qq_41822345/article/details/125880611

    六、同步

    Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。

    基本原语

    Go 语言在 sync 包中提供了用于同步的一些基本原语:sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond。这些基本原语提供了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。

    sync.Mutex互斥锁

    Go 语言的 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。这两个字段加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。

    // A Mutex is a mutual exclusion lock.
    // The zero value for a Mutex is an unlocked mutex.
    type Mutex struct {
    	state int32 //零值表示未被锁定的互斥量   转态值为0 表示未被锁
    	sema  uint32
    }
    
    const (
    	mutexLocked = 1 << iota // mutex is locked  //表示互斥锁的锁定状态;
    	mutexWoken                                  //表示从正常模式被从唤醒;
    	mutexStarving                               //当前的互斥锁进入饥饿状态模式;
    	mutexWaiterShift = iota                     //当前互斥锁上等待的 Goroutine 个数;
    	)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 正常模式和饥饿模式

    正常模式→饥饿模式
    正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。

    饥饿模式是在 Go 语言在 1.9 中引入的优化,引入的目的是保证互斥锁的公平性。

    饥饿模式→正常模式
    在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

    更多相关请查看源码。加锁:sync.Mutex.Lock、解锁:sync.RWMutex.Unlock

    互斥锁的加锁过程比较复杂,它涉及自旋信号量以及调度等概念:

    • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
    • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
    • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
    • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
    • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

    互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

    • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
    • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
    • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

    sync.RWMutex读写锁

    读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

    常见服务的资源读多写少,因为大多数的读请求之间不会相互影响,所以可以分离读写操作,以此来提高服务的性能。

    type RWMutex struct {
    	w           Mutex  //复用互斥锁提供的能力;
    	writerSem   uint32 //用于[写等待读]
    	readerSem   uint32 //用于[读等待写]
    	readerCount int32  //当前正在执行的读操作数量
    	readerWait  int32  //当写操作被阻塞时等待的读操作个数
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    更多相关请查看源码。
    写操作使用 sync.RWMutex.Lock 和 sync.RWMutex.Unlock 方法;
    读操作使用 sync.RWMutex.RLock 和 sync.RWMutex.RUnlock 方法;

    虽然读写互斥锁 sync.RWMutex 提供的功能比较复杂,但是因为它建立在 sync.Mutex 上,所以实现会简单很多。总结一下读锁和写锁的关系:

    • 调用sync.RWMutex.Lock 尝试获取写锁时;
      • 每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;
      • readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;
    • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;

    读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。

    sync.WaitGroup

    sync.WaitGroup 可以等待一组 Goroutine 的返回,可以通过 sync.WaitGroup 将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。

    sync.WaitGroup 结构体中只包含两个成员变量:

    type WaitGroup struct {
    	noCopy noCopy      //保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
    	state1 [3]uint32   //存储着状态和信号量;12个字节 
    }
    
    func main() {
    	wg := sync.WaitGroup{}
    	yawg := wg
    	fmt.Println(wg, yawg)
    }
    
    $ go vet proc.go
    ./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
    ./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
    ./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    sync.WaitGroup 对外暴露了三个方法 —— sync.WaitGroup.Addsync.WaitGroup.Waitsync.WaitGroup.Done

    更多相关请查看源码。

    通过对 sync.WaitGroup 的分析和研究,能够得出以下结论:

    sync.Once

    Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。
    sync.Once.Do 方法中传入的函数只会被执行一次,哪怕函数中发生了 panic
    两次调用 sync.Once.Do 方法传入不同的函数只会执行第一次调传入的函数。

    在运行如下所示的代码时,我们会看到如下所示的运行结果:

    func main() {
        o := &sync.Once{}
        for i := 0; i < 10; i++ {
            o.Do(func() {
                fmt.Println("only once")
            })
        }
    }
    
    $ go run main.go
    only once
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    每一个 sync.Once 结构体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex

    type Once struct {
    	done uint32
    	m    Mutex
    }
    
    • 1
    • 2
    • 3
    • 4

    sync.Once.Dosync.Once 结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:

    • 如果传入的函数已经执行过,会直接返回;
    • 如果传入的函数没有执行过,会调用 sync.Once.doSlow 执行传入的函数;
    func (o *Once) Do(f func()) {
    	if atomic.LoadUint32(&o.done) == 0 {
    		o.doSlow(f)
    	}
    }
    
    func (o *Once) doSlow(f func()) {
    	o.m.Lock()                   //1为当前 Goroutine 获取互斥锁;
    	defer o.m.Unlock()
    	if o.done == 0 {
    		defer atomic.StoreUint32(&o.done, 1) // 3运行延迟函数调用,将成员变量 `done` 更新成 1;
    		f()                      //2执行传入的无入参函数;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    sync.Once 会通过成员变量 done 确保函数不会执行第二次。

    作为用于保证函数执行次数的 sync.Once 结构体,它使用互斥锁和 sync/atomic 包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。

    sync.Cond

    Go 语言标准库中还包含条件变量 sync.Cond,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:

    var status int64
    
    func main() {
    	c := sync.NewCond(&sync.Mutex{})
    	for i := 0; i < 10; i++ {
    		go listen(c)
    	}
    	time.Sleep(1 * time.Second)
    	go broadcast(c)
    
    	ch := make(chan os.Signal, 1)
    	signal.Notify(ch, os.Interrupt)
    	<-ch
    }
    
    func broadcast(c *sync.Cond) {
    	c.L.Lock()
    	atomic.StoreInt64(&status, 1)
    	c.Broadcast()
    	c.L.Unlock()
    }
    
    func listen(c *sync.Cond) {
    	c.L.Lock()
    	for atomic.LoadInt64(&status) != 1 {
    		c.Wait()
    	}
    	fmt.Println("listen")
    	c.L.Unlock()
    }
    
    $ go run main.go
    listen
    ...
    listen
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:

    调用 sync.Cond.Broadcast 方法后,上述代码会打印出 10 次 “listen” 并结束调用。

    sync.Cond 的结构体中包含以下 4 个字段:

    type Cond struct {
    	noCopy  noCopy       //用于保证结构体不会在编译期间拷贝;
    	L       Locker       //用于保护内部的 notify 字段,Locker 接口类型的变量;
    	notify  notifyList   //一个 Goroutine 的链表,它是实现同步机制的核心结构;
    	checker copyChecker  //用于禁止运行期间发生的拷贝;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时也需要注意以下问题:

    • sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
    • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
    • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine。

    原子语句

    atomic包

    针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。Go语言中原子操作由内置的标准库sync/atomic提供。

    很多源码中都使用到了atomic包原子操作。

    方法解释
    func LoadInt32(addr *int32) (val int32)
    func LoadInt64(addr *int64) (val int64)
    func LoadUint32(addr *uint32) (val uint32)
    func LoadUint64(addr *uint64) (val uint64)
    func LoadUintptr(addr *uintptr) (val uintptr)
    func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)读取操作
    func StoreInt32(addr *int32, val int32)
    func StoreInt64(addr *int64, val int64)
    func StoreUint32(addr *uint32, val uint32)
    func StoreUint64(addr *uint64, val uint64)
    func StoreUintptr(addr *uintptr, val uintptr)
    func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)写入操作
    func AddInt32(addr *int32, delta int32) (new int32)
    func AddInt64(addr *int64, delta int64) (new int64)
    func AddUint32(addr *uint32, delta uint32) (new uint32)
    func AddUint64(addr *uint64, delta uint64) (new uint64)
    func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)修改操作
    func SwapInt32(addr *int32, new int32) (old int32)
    func SwapInt64(addr *int64, new int64) (old int64)
    func SwapUint32(addr *uint32, new uint32) (old uint32)
    func SwapUint64(addr *uint64, new uint64) (old uint64)
    func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
    func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)交换操作
    func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
    func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
    func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
    func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
    func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
    func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)比较并交换操作
    • 原子操作性能 v.s.互斥锁性能
    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type Counter interface {
    	Inc()
    	Load() int64
    }
    
    // 普通版
    type CommonCounter struct {
    	counter int64
    }
    
    func (c CommonCounter) Inc() {
    	c.counter++
    }
    
    func (c CommonCounter) Load() int64 {
    	return c.counter
    }
    
    // 互斥锁版
    type MutexCounter struct {
    	counter int64
    	lock    sync.Mutex
    }
    
    func (m *MutexCounter) Inc() {
    	m.lock.Lock()
    	defer m.lock.Unlock()
    	m.counter++
    }
    
    func (m *MutexCounter) Load() int64 {
    	m.lock.Lock()
    	defer m.lock.Unlock()
    	return m.counter
    }
    
    // 原子操作版
    type AtomicCounter struct {
    	counter int64
    }
    
    func (a *AtomicCounter) Inc() {
    	atomic.AddInt64(&a.counter, 1)
    }
    
    func (a *AtomicCounter) Load() int64 {
    	return atomic.LoadInt64(&a.counter)
    }
    
    func test(c Counter) {
    	var wg sync.WaitGroup
    	start := time.Now()
    	for i := 0; i < 1000; i++ {
    		wg.Add(1)
    		go func() {
    			c.Inc()
    			wg.Done()
    		}()
    	}
    	wg.Wait()
    	end := time.Now()
    	fmt.Println("用时:", c.Load(), end.Sub(start))
    }
    
    func main() {
    	c1 := CommonCounter{} // 非并发安全
    	test(c1)
    	c2 := MutexCounter{} // 使用互斥锁实现并发安全
    	test(&c2)
    	c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
    	test(&c3)
    }
    $ go run main.go
    用时: 0 1.0368ms 
    用时: 1000 517.8µs 
    用时: 1000 522.2µs
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    虽然atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者 sync 包的函数/类型实现同步更好。

  • 相关阅读:
    React18原理: React核心对象之ReactElement对象和Fiber对象
    不希望你的数据在云中?关闭iPhone或Mac上的iCloud
    Java命名规范
    react使用gg-editor编写拓扑编辑器
    Allegro 172版本自动放置层叠
    Postgresql 13 安装
    算法 - 计数排序(Counting_Sort)
    C++工程使用curl 静态库
    python note
    springboot+学校运动会信息管理 毕业设计-附源码231058
  • 原文地址:https://blog.csdn.net/qq_41822345/article/details/126005255