• Go 中的并发 Concurrency


    油罐笔记:Concurrency in Go
    https://www.youtube.com/watch?v=LvgVSSpwND8
    https://stackoverflow.com/questions/25795131/do-buffered-channels-maintain-order

    1. goroutine

    goroutine 是由 Go runtime 管理的轻量级线程。goroutine 非常高效,即使在程序里创建上千个 gorountine 都是可行的。但并非 goroutine 数量越多 app 性能越高,因为最终会受到 CPU 的限制。

    2. WaitGroup

    以下的 code,有 3 个 goroutine: 一个 main 加两个 count,现在的问题是,count goroutine 看起来没有执行,因为 main goroutine 启动两个 count goroutine 后,由于没有其他语句,所以立即结束。而对于Go,一旦 main goroutine 退出,整个程序立即结束,是否有其他 goroutine 在跑无关紧要,这些 goroutine 将没有时间做任何事情。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go count("sheep")
    	go count("fish")
    }
    
    func count(thing string) {
    	for i:=1;true;i++ {
    		fmt.Println(i, thing)
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    为了解决 main goroutine 过早退出的问题,可以在 main 里加 time.Sleep(time.Seconde * 3) 或者 fmt.Scanln() 这样的语句,但更好的方法是使用 WaitGroup, WaitGroup 有一个counter,用以统计正在等待的 goroutine 的数量,初值为 0Add(1) 增加计数,Add(1) 必须在启动 goroutine 之前执行,Done() 减小计数。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    func main() {
    	var wg sync.WaitGroup
    	wg.Add(1)   //wg has a counter, counter += 1
    
    	go func() {
    		count("sheep")
    		wg.Done()   // counter -= 1
    	}()
    		wg.Wait()   // block,直到 counter 为 0. 如果有任何 goroutine 未结束,将等待。
    }
    
    func count(thing string) {
    	for i := 1; i <= 5; i++ {
    		fmt.Println(i, thing)
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    比起 WaitGroup,channel 更为有用。

    3. channel

    channel 用于 goroutine 之间互相通信,发送给 channel 的可以是任意类型的数据,即使是 channel 类型都可以。

    重要,通道的发送和接收都是阻塞操作

    当试图接收数据时,必须等待,直到 channel 里有用于读取的值出现,当试图发送数据时,同样必须等待,直到 receiver 准备好接收。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	fmt.Println("program started")
    	c := make(chan string)
    	go count("sheep", c)
    
    	msg := <-c
    	fmt.Println(msg)
    }
    
    func count(thing string, c chan string) {
    	for i := 1; i <= 5; i++ {
    		time.Sleep(time.Millisecond * 5000)
    		c <- thing
    		time.Sleep(time.Millisecond * 8998886000)  // no effect
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    通道阻塞的特性,使得可以同步各个 goroutine
    在这里插入图片描述

    3.1 使用 close() 语句关闭通道

    以下的代码,虽然能运行出结果,但 Go 在运行时检测到死锁:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	c := make(chan string)
    	go count("sheep", c)
    
    	for {
    		msg := <-c
    		fmt.Println(msg)
    	}
    }
    
    func count(thing string, c chan string) {
    	for i := 1; i <= 5; i++ {
    		c <- thing
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    死锁出现的原因是,发送方在发送5次数据后,循环结束,不再发送数据,但接收方仍在等待不可能获得的数据,因而出现死锁。运行结果如下:

    sheep
    sheep
    sheep
    sheep
    sheep
    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [chan receive]:
    main.main()
    	/tmp/sandbox621737862/prog.go:15 +0x7f
    
    Program exited.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    如果发送方已经结束数据发送,不再需要通道时,可以将其关闭。
    与之相反,接收方任何时候都不应关闭通道,因为接收方无法得知发送方是否已经结束数据发送,接收方过早关闭通道,将导致异常。

    为了解决此死锁的问题,发送方使用 close() 语句关闭通道,接收方会检测通道是否关闭:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	c := make(chan string)
    	go count("sheep", c)
    
    	for {
    		msg, open := <-c    // open: 通道是否依然打开
    		if !open {    // 如果通道关闭,退出循环
    			break
    		}
    		fmt.Println(msg)
    	}
    }
    
    func count(thing string, c chan string) {
    	for i := 1; i <= 5; i++ {
    		c <- thing
    		time.Sleep(time.Millisecond * 500)
    	} 
    	close(c)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    使用 syntax sugar, main 中的 for 循环可以简化:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	c := make(chan string)
    	go count("sheep", c)
    
    	for msg := range c {   // 持续从通道接收数据,将其放到 msg 变量中,直至通道关闭
    		fmt.Println(msg)
    	}
    }
    
    func count(thing string, c chan string) {
    	for i := 1; i <= 5; i++ {
    		c <- thing
    		time.Sleep(time.Millisecond * 500)
    	} 
    	close(c)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    以下的只有几行的代码同样会出现死锁:

    // You can edit this code!
    // Click here and start typing.
    package main
    
    import "fmt"
    
    func main() {
    	c := make(chan string)
    	c <- "hello"
    
    	msg := <-c
    	fmt.Println(msg)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [chan send]:
    main.main()
    	/tmp/sandbox222398587/prog.go:9 +0x37
    
    Program exited.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    因为发送会阻塞,直到接收方准备好接收。而代码到不了接收的行 msg := <-c,因为 c <- "hello" 阻塞,为了使这段代码能 work,应该将发送放到 goroutine 中,另一种可选的做法是,将 channel 设置为 buffered channel

    4. buffered channel 缓冲通道

    通道默认都是 unbuffered 即无缓冲的,强制要求通道的发送方和接收方同时完成发送和接收,如果无法做到就阻塞等待,无缓冲通道用于实现 goroutine 之间的同步通信。而缓冲通道允许接收一定数量的数据,不要求都有对应的接收者,仅当通道满了,即数据的数量以达到指定的 capacity 时才阻塞。

    package main
    
    import "fmt"
    
    func main() {
    	c := make(chan string, 2)  
    	c <- "hello"
    	c <- "world"
    
    	msg := <-c
    	fmt.Println(msg)
    
    	msg = <-c
    	fmt.Println(msg)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    输出:

    PS D:\temp\go_test> go run main.go
    hello
    world
    
    • 1
    • 2
    • 3

    通道 (或者:无缓冲通道)
    在这里插入图片描述
    缓冲通道
    在这里插入图片描述

    5. 为什么使用 select

    如下的代码效率不高:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	c1 := make(chan string)
    	c2 := make(chan string)
    
    	go func() {
    		for {
    			c1 <- "Every 500ms"
    			time.Sleep(time.Millisecond * 500)
    		}
    	}()
    
    	go func() {
    		for {
    			c2 <- "Every 2s"
    			time.Sleep(time.Second * 2)
    		}
    	}()
    
    	for {
    		fmt.Println(<-c1)
    		fmt.Println(<-c2)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    后面的输出,总是等待2秒后,c1, c2 中的数据一起输出:

    Every 500ms
    Every 2s
    Every 500ms
    Every 2s
    Every 500ms
    Every 2s
    Every 500ms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果改成 select

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	c1 := make(chan string)
    	c2 := make(chan string)
    
    	go func() {
    		for {
    			c1 <- "Every 500ms"
    			time.Sleep(time.Millisecond * 500)
    		}
    	}()
    
    	go func() {
    		for {
    			c2 <- "Every 2s"
    			time.Sleep(time.Second * 2)
    		}
    	}()
    
    	for {
    		select {
    			case msg1 := <-c1:
    				fmt.Println(msg1)
    			case msg2 := <-c2:
    				fmt.Println(msg2)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    运行结果:

    Every 2s
    Every 500ms
    Every 500ms
    Every 500ms
    Every 500ms
    Every 2s
    Every 500ms
    Every 500ms
    Every 500ms
    Every 500ms
    Every 2s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    select 的作用:哪个 channel ready 就从哪个 channel 接收数据,从而减少不必要的等待时间。

    6. 一种常见的模式:worker pool

    以下的代码计算 fibonacci 数,如果使用 单个 goroutine:

    package main
    
    import "fmt"
    import "time"
    
    func main() {
    	start := time.Now()
    
    	jobs := make(chan int, 100)
    	results := make(chan int, 100)
    	go worker(jobs, results)
    	
    	for i := 0; i < 45; i++ {
    		jobs <- i
    	}
    	close(jobs)
    	for j := 0; j < 45; j++ {
    		fmt.Println(<-results)
    	}
    	elapsed := time.Since(start)
    	fmt.Println("---", elapsed)
    }
    
    func worker(jobs <-chan int, results chan<- int) {
    	for n := range jobs {
    		results <- fib(n)
    	}
    }
    
    func fib(n int) int {
    	if n <= 1 {
    		return n
    	}
    	return fib(n-1) + fib(n-2)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    运行结果:

    0
    1
    1
    2
    3
    ..............
    46368
    433494437
    701408733
    --- 8.5861279s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行时间长,CPU 的利用率不高:
    在这里插入图片描述

    但是如果增加 worker goroutine 的数目:

    jobs := make(chan int, 100)
    	results := make(chan int, 100)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	go worker(jobs, results)
    	for i := 0; i < 45; i++ {
    		jobs <- i
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    以上的 worker goroutine 是随意加的,所需时间缩短了将近4s,,缺点就是输出结果无序。

    0
    13
    21
    34
    55
    89
    144
    233
    377
    610
    987
    1597
    2584
    4181
    6765
    10946
    17711
    28657
    2
    3
    46368
    1
    121393
    75025
    5
    196418
    317811
    267914296
    433494437
    701408733
    --- 3.7104683s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    CPU 利用率增加
    在这里插入图片描述
    使用 worker pool,一定程度上将能提高程序的 performance

  • 相关阅读:
    2021年中国研究生数学建模竞赛B题——空气质量预报二次建模
    MySQl表的增删查改(聚合查询+联合查询)
    Fliki AI:让视频创作更简单、更高效
    文字转音频软件哪个好用?这几个方法你值得拥有
    Flink SQL--- Over Aggregation
    Vue常见的实现tab切换的两种方法
    [附源码]java毕业设计社区疫情防控管理系统
    MySQL主从复制
    MySQL 日志管理
    国产动漫|基于Springboot的国产动漫网站设计与实现(源码+数据库+文档)
  • 原文地址:https://blog.csdn.net/ftell/article/details/126003843