• Go sync.waitGroup


    前言

    下面的代码是基于 go1.20 版本

    属性

    • noCopy 给 go vet 静态检查用的,防止 copy
    • state 状态统计 高32位是任务数量,低32位是等待数量
    • sema 信号量,用于休眠或者唤醒
    type WaitGroup struct {
    	noCopy noCopy
    
    	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
    	sema  uint32
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Add

    • 用于添加任务计数
     func (wg *WaitGroup) Add(delta int) {
    	//忽略竞态代码块 
    	if race.Enabled {
    		if delta < 0 {
    			// Synchronize decrements with Wait.
    			race.ReleaseMerge(unsafe.Pointer(wg))
    		}
    		race.Disable()
    		defer race.Enable()
    	}
    	
    	//有任务进来,在高32位上进行加减,来标记待执行任务
    	state := wg.state.Add(uint64(delta) << 32)
    	v := int32(state >> 32) //当前获取到的任务数量
    	w := uint32(state) //等待者数量,即 wg.Wait 执行次数
    	
    	//忽略竞态代码块
    	if race.Enabled && delta > 0 && v == int32(delta) {
    		// The first increment must be synchronized with Wait.
    		// Need to model this as a read, because there can be
    		// several concurrent wg.counter transitions from 0.
    		race.Read(unsafe.Pointer(&wg.sema))
    	}
    	
    	//验证任务数量,任务数量不能小于0
    	if v < 0 {
    		panic("sync: negative WaitGroup counter")
    	}
    	// 有等待者,加完和没加之前一样,说明 wait 执行后 还执行了 Add 而非 Done
    	if w != 0 && delta > 0 && v == int32(delta) {
    		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    	}
    	
    	// 没有等待着或者任务没执行完,不需要去唤醒
    	if v > 0 || w == 0 {
    		return
    	}
        //获取的state已经满足唤醒 wg.Wait 了,结果你告诉我状态变了
    	if wg.state.Load() != state {
    		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    	}
    	
    	//清空任务数量,唤醒 wg.Wait 的阻塞等待
    	wg.state.Store(0)
    	// 有过少个 waiter 都给唤醒
    	for ; w != 0; w-- {
    		runtime_Semrelease(&wg.sema, false, 0)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    Done

    • Add(-1) 用于减少任务计数

    Wait

    • 阻塞等待
    func (wg *WaitGroup) Wait() {
    	//忽略竞态代码块
    	if race.Enabled {
    		race.Disable()
    	}
    	for {
    		state := wg.state.Load()
    		v := int32(state >> 32)
    		w := uint32(state)
    		//没有任务,不等
    		if v == 0 {
    			// Counter is 0, no need to wait.
    			if race.Enabled {
    				race.Enable()
    				race.Acquire(unsafe.Pointer(wg))
    			}
    			return
    		}
    		//等待者+1
    		if wg.state.CompareAndSwap(state, state+1) {
    			//忽略竞态代码块
    			if race.Enabled && w == 0 {
    				// Wait must be synchronized with the first Add.
    				// Need to model this is as a write to race with the read in Add.
    				// As a consequence, can do the write only for the first waiter,
    				// otherwise concurrent Waits will race with each other.
    				race.Write(unsafe.Pointer(&wg.sema))
    			}
    			//进入睡眠,等待唤醒
    			runtime_Semacquire(&wg.sema)
    			//进入唤醒状态,Add 中开始 唤醒 wait 时,会先将状态 reset 0
    			//此时如果并发执行 add 或 wait 使用 panic 抛出异常
    			if wg.state.Load() != 0 {
    				panic("sync: WaitGroup is reused before previous Wait has returned")
    			}
    			if race.Enabled {
    				race.Enable()
    				race.Acquire(unsafe.Pointer(wg))
    			}
    			return
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    总结

    • 整体流程还是比较简单的
    • Add 和 Done 会对任务进行计数增减,当计数任务全部完成后对 wait的阻塞进行唤醒
      • 有等待的才会去唤醒
    • Wait 会根据 计数任务来决定是否进行阻塞等待
      • 如果计数任务 > 0,进行等待 & 等待计数 +1
      • 计数任务等于 0,不等待
    • 通过查看上面的panic 代码块,我们知道不建议 在 wg.Add + wg.Wait 后再 进行 wg.Add
  • 相关阅读:
    Netty服务器启动源码剖析
    智能升降桌控制主板开发,解锁办公家居新场景
    LeetCode27.移除元素(暴力法、快慢指针法)
    uniapp不同平台获取文件内容以及base64编码特征
    mysql—MHA原理与实现
    重读经典《操作系统:设计与实现》
    推进智慧工地建设,智慧工地是什么?建筑工地人必看!
    Linux下VSCode的安装和基本使用
    Centos 安装MySQL 5.7.38
    【保研面试】英文问题
  • 原文地址:https://blog.csdn.net/qq_29744347/article/details/132817749