这个是通过通道, 来控制 goroutine 协程结束的示例:
func coordinateWithChan() { sign := make(chan struct{}, 2) num := int32(0) fmt.Printf("The number: %d [with chan struct{}]\n", num) max := int32(10) go addNum(&num, 1, max, func() { sign <- struct{}{} }) go addNum(&num, 2, max, func() { sign <- struct{}{} }) <-sign <-sign}
上一节我们学习过, sign 通道读取数据时, 如果命中"有缓冲 channel + 缓冲为空"的情况, 会阻塞, 只有两个 go 协程全部执行完毕, 往 sign 塞数据后, 程序才会退出, 但是这种方式非常繁琐。在这种应用场景下, 我们可以选用另外一个同步工具 sync.WaitGroup(以下简称 WaitGroup 类型), 它比通道更加适合实现这种一对多的 goroutine 协作流程。WaitGroup 类型是开箱即用的, 也是并发安全的, 它拥有三个指针方法: Add、Done 和 Wait, 你可以想象该类型中有一个计数器, 它的默认值是 0, 我们可以通过调用该类型值的 Add 方法来增加, 或者减少这个计数器的值, 代码升级如下:
func coordinateWithWaitGroup() { var wg sync.WaitGroup wg.Add(2) // 计数器加 2 num := int32(0) fmt.Printf("The number: %d [with sync.WaitGroup]\n", num) max := int32(10) go addNum(&num, 3, max, wg.Done) // 计数器减 1 go addNum(&num, 4, max, wg.Done) // 计数器减 1 wg.Wait() // 会阻塞, 直到计数器值为 0, 然后就会被唤醒}
Add 会增加计数器的值, Done 会减少计数器的值, Wait 会一直阻塞, 直到计数器的值重新回归为 0, 然后才会被唤醒, 继续往后面执行。
如果使用不当, 容易抛出 Panic, 我就把相关知识点列出来:
对于上一章的并发示例, 当时提了一个问题: 每消费一条 Channel 数据, 需要记录 Push 发送成功, 但是一条 Channel 数据包含 2-3 个 Push 内容 (IOS/Android/PC), 程序记录 Push 成功前, 如何保证这 2-3 个 Push 都发送完毕了呢? 根据"先统一 Add, 再并发 Done, 最后 Wait"原则, 看下面代码:
var ( wg sync.WaitGroup succs []*NotifyMessage fails []*NotifyMessage)for _, message := range t.PushMessages { wg.Add(1) // 计数加 1 go func(message mipush.PushMessage) { defer func() { wg.Done() // 计数减 1 }() // 发送 IOS/Android/PC 等渠道的 Push // 代码省略。.. }(message)}wg.Wait() // 阻塞, 直到计数器值为 0, 然后就会被唤醒// 数据统计 SendNotify(t.ID, t.TotalPage, t.TaskPage, t.AppType, t.AppLocal, fails, succs)
WaitGroup 是开箱即用和并发安全的, 可以通过它很方便地实现一对多 goroutine 协作流程, 即: 一个分发子任务的 goroutine, 和多个执行子任务的 goroutine, 共同来完成一个较大的任务。在使用 WaitGroup 值的时候, 我们一定要注意, 千万不要让其中的计数器的值小于 0, 否则就会引发 panic。另外, 我们最好用"先统一 Add, 再并发 Done, 最后 Wait"这种标准方式, 来使用 WaitGroup 值, 尤其不要在调用 Wait 方法的同时, 并发地通过调用 Add 方法去增加其计数器的值, 因为这也有可能引发 panic。