• Go语言并发控制


    channel

    1. // cancelFn 数据通道关闭通知退出
    2. func cancelFn(dataChan chan int) {
    3. for {
    4. select {
    5. case val, ok := <-dataChan:
    6. // 关闭data通道时,通知退出
    7. // 一个可选是判断data=指定值时退出
    8. if !ok {
    9. fmt.Printf("Channel closed !!!")
    10. return
    11. }
    12. fmt.Printf("Receive data from dataChan %d\n", val)
    13. }
    14. }
    15. }
    16. func main() {
    17. channels := make([]chan int, 10)
    18. for i := 0; i < 10; i++ {
    19. channels[i] = make(chan int)
    20. go cancelFn(channels[i])
    21. channels[i] <- 1 // 向管道写数据
    22. fmt.Println(i, "quit")
    23. }
    24. }

    watitGroup

    1. var wg sync.WaitGroup
    2. func main() {
    3. ch := make(chan int)
    4. wg.Add(1) //设置计数器 表示goroutine个数加1
    5. go func() {
    6. v, ok := <-ch
    7. if ok {
    8. fmt.Println("value", v)
    9. }
    10. wg.Done() //执行结束之后 , goroutine个数减1
    11. }()
    12. wg.Add(1)
    13. go func() {
    14. ch <- 4
    15. wg.Done()
    16. }()
    17. wg.Wait() //主goroutine阻塞,等待计数器变为0
    18. }

    WaitGroup原理

    1. type WaitGroup struct {
    2. statel [3]uint32
    3. /*
    4. 长度为3的数组包含两个计数器和一个信号量
    5. counter : 当前还未执行的结束的goroutine计数器
    6. waiter count : 等待goroutine-group结束的goroutine数量
    7. semaphore: 信号量
    8. */
    9. }

    WaitGroup对外提供了三个接口

    • Add(delta int) : 将delta值加到counter中
    • Wait(): waiter递增加1 , 并阻塞等待信号量semaphore
    • Done(): counter递减1 , 按照waiter数值释放相应次数的信号量

    Add(delta int)

    Add() 做了两件事 , 一是把delta值累加到counter中,因为delta可以为负值.所以说当counter变为0时,根据waiter数值释放等量的信号量 , 把等待的goroutine全部唤醒,如果couner变为负值,则触发panic.

    Wait()

    Wait()方法一个是要累加waiter , 二是阻塞等待信号量.

    Done()

    Done 只做一件事,把counter减少1,其实Done里面调用的就是Add(-1)

    context原理

    Context实际上只定义了接口,凡是实现该接口的类都能称为Context.

    1. type Context interface {
    2. Deadline() (deadline time.Time , ok bool)
    3. Done() <-chan struct{}
    4. Err() error
    5. value(key interface{}) interface{}
    6. }

    Deadline()

    该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline , 则ok为false,此时deadline为一个初始值的time.Time值.

    Done()

    该方法返回一个用于探测context是否取消的channel,当context取消时,会自动将该channel关闭. 对于不支持取消的context(如:context.Backgroud) , 该方法可能会返回nil.

    Err()

    该方法描述context关闭的原因.关闭原因由context实现控制.

    value()

    有一种context,它不是用于控制呈树状分布的goroutine , 而是用于在树状分布的goroutine之间传递信息.Value()方法就是此种类型的context,根据key查询map集合中的value.

    空context

    context包中定义了一个公用的emptyCtx全局变量 , 名为backgroud,可以使用context.Backgroud()获取它.context包中提供了四个方法创建不同类型的context , 使用这四个方法如果没有父context,则都需要传入background , 即将background作为父节点:

    • WithCancel();
    • WithDeadline();
    • WithTimeout();
    • WithValue();

    context包中实现Context接口的struct,除了emptyCtx , 还有cancelCtx , timerCtx 和 valueCtx三种.

    cancelCtx

    1. type cancelCtx struct {
    2. Context
    3. mu sync.Mutex
    4. done chan struct{}
    5. children map[canceler]struct{}
    6. err error
    7. }

    children 中记录了由此context 派生的所有child , 此context被"cancel"时,会把其中所有的child都cancel掉.cancelCtx与deadline和value无关 , 所以只需要实现Done() 和 Err() 外露接口即可.

    Cancel()接口的实现

    cancel()内部方法时理解cancelCtx的关键cancelCtx.children的map中,其中key值即后代对象,value值并没有意义.

    1. func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    2. c.mu.Lock()
    3. c.err = err //设置一个error,说明关闭原因
    4. close(c.done) //将channel关闭,以此通知派生的context
    5. for child := range c.children { //遍历所有children,逐个调用cancel方法
    6. child.cancel(false, err)
    7. }
    8. c.children = nil
    9. c.mu.Unlock()
    10. if removeFromParent { //正常情况下,需要将自己从parent删除
    11. removeChild(c.Context, c)
    12. }
    13. }

    WithCancel()方法的使用案例

    1. func HandelRequest(ctx context.Context) {
    2. go WriteRedis(ctx)
    3. go WriteDatabase(ctx)
    4. for {
    5. select {
    6. case <-ctx.Done():
    7. fmt.Println("HandelRequest Done.")
    8. return
    9. default:
    10. fmt.Println("HandelRequest running")
    11. time.Sleep(2 * time.Second)
    12. }
    13. }
    14. }
    15. func WriteRedis(ctx context.Context) {
    16. for {
    17. select {
    18. case <-ctx.Done():
    19. fmt.Println("WriteRedis Done.")
    20. return
    21. default:
    22. fmt.Println("WriteRedis running")
    23. time.Sleep(2 * time.Second)
    24. }
    25. }
    26. }
    27. func WriteDatabase(ctx context.Context) {
    28. for {
    29. select {
    30. case <-ctx.Done():
    31. fmt.Println("WriteDatabase Done.")
    32. return
    33. default:
    34. fmt.Println("WriteDatabase running")
    35. time.Sleep(2 * time.Second)
    36. }
    37. }
    38. }
    39. func main() {
    40. ctx, cancel := context.WithCancel(context.Background())
    41. go HandelRequest(ctx)
    42. time.Sleep(5 * time.Second)
    43. fmt.Println("It's time to stop all sub goroutines!")
    44. cancel()
    45. //Just for test whether sub goroutines exit or not
    46. time.Sleep(5 * time.Second)
    47. }

    HandelRequest()用于处理某个请求 , 其又会创建两个协程 , main协程可以在适当时机cancel掉所有自子协程

    timeCtx

    1. type timerCtx struct {
    2. cancelCtx
    3. timer *time.Timer
    4. deadline time.Time
    5. }

    timerCtx 在cancelCtx的基础上,增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器.由此衍生出了WithDeadline()和WithTimeout().

    • deadline:指定最后期限.
    • timeout: 指定最长存活时间.
    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. "context"
    6. )
    7. func HandelRequest(ctx context.Context) {
    8. go WriteRedis(ctx)
    9. go WriteDatabase(ctx)
    10. for {
    11. select {
    12. case <-ctx.Done():
    13. fmt.Println("HandelRequest Done.")
    14. return
    15. default:
    16. fmt.Println("HandelRequest running")
    17. time.Sleep(2 * time.Second)
    18. }
    19. }
    20. }
    21. func WriteRedis(ctx context.Context) {
    22. for {
    23. select {
    24. case <-ctx.Done():
    25. fmt.Println("WriteRedis Done.")
    26. return
    27. default:
    28. fmt.Println("WriteRedis running")
    29. time.Sleep(2 * time.Second)
    30. }
    31. }
    32. }
    33. func WriteDatabase(ctx context.Context) {
    34. for {
    35. select {
    36. case <-ctx.Done():
    37. fmt.Println("WriteDatabase Done.")
    38. return
    39. default:
    40. fmt.Println("WriteDatabase running")
    41. time.Sleep(2 * time.Second)
    42. }
    43. }
    44. }
    45. func main() {
    46. ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
    47. go HandelRequest(ctx)
    48. time.Sleep(10 * time.Second)
    49. }

    valueCtx

    1. type valueCtx struct {
    2. Context
    3. key, val interface{}
    4. }

    valueCtx 只是在Context基础上增加了一个key-value对,用于在各级协程之间传递数据.因此只需要实现Value()接口.

    1. func HandelRequest(ctx context.Context) {
    2. for {
    3. select {
    4. case <-ctx.Done():
    5. fmt.Println("HandelRequest Done.")
    6. return
    7. default:
    8. fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))
    9. time.Sleep(2 * time.Second)
    10. }
    11. }
    12. }
    13. func main() {
    14. ctx := context.WithValue(context.Background(), "parameter", "1")
    15. go HandelRequest(ctx)
    16. time.Sleep(10 * time.Second)
    17. }

    子协程可以读到context的key-value

  • 相关阅读:
    TDengine 3.0 数据订阅功能的“独家”使用经验,只此一份!
    【RHCE】作业:DNS主从同步&防火墙iptables服务使用
    P2572 [SCOI2010] 序列操作【线段树】
    muc和soc的区别与联系
    开源闭源杂谈
    mysql8.0安装教程与配置(最详细)操作简单
    第15章: 泛型
    Fultter学习日志(2)-构建第一个flutter应用
    Linux常用指令--查找指令
    自然语言处理 (NLP) 简介
  • 原文地址:https://blog.csdn.net/c0210g/article/details/137987454