• Go 之常用并发学习


    在 Go语言实战 中看到有些并发相关的例子,讲解得也比较详细,于是乎写来加深下印象。

    无缓冲通道

    无缓冲通道在接收前没有能力保存任何值。我自己找了书上的示例来加深一下印象。

    模拟网球比赛

    1. package main
    2. import (
    3. "fmt"
    4. "math/rand"
    5. "sync"
    6. "time"
    7. )
    8. var (
    9. wg sync.WaitGroup
    10. )
    11. func init() {
    12. rand.Seed(time.Now().UnixNano()) // 让每次运行生成的随机数不相同
    13. }
    14. func main() {
    15. count := make(chan int)
    16. wg.Add(2)
    17. go player("Nadal", count)
    18. go player("Looking", count)
    19. count <- 1
    20. wg.Wait()
    21. }
    22. func player(name string, count chan int) {
    23. defer wg.Done()
    24. for {
    25. ball, ok := <-count
    26. if !ok {
    27. fmt.Printf("Player %s Win\n", name)
    28. return
    29. }
    30. n := rand.Intn(100)
    31. if n%13 == 0 {
    32. fmt.Printf("Player %s Missed\n", name)
    33. close(count)
    34. return
    35. }
    36. fmt.Printf("Player %s Hit %d\n", name, ball)
    37. count <- ball
    38. }
    39. }

    模拟接力赛

    接力赛中,接力棒只能在一个人手中。

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var (
    8. wg sync.WaitGroup
    9. )
    10. func main() {
    11. baton := make(chan int)
    12. wg.Add(1)
    13. go Runner(baton)
    14. baton <- 1
    15. wg.Wait()
    16. }
    17. func Runner(baton chan int) {
    18. var newRunner int
    19. runner := <- baton
    20. fmt.Printf("Runner %d Running with baton\n", runner)
    21. if runner != 4 {
    22. newRunner = runner + 1
    23. fmt.Printf("Runner %d to the line\n", newRunner)
    24. go Runner(baton)
    25. }
    26. time.Sleep(100 * time.Millisecond)
    27. if runner == 4 {
    28. fmt.Printf("Runner %d finished, Race over\n", runner)
    29. wg.Done()
    30. return
    31. }
    32. fmt.Printf("Runner %d Exchange with runner %d \n", runner, newRunner)
    33. baton <- newRunner
    34. }
    1. Runner 1 Running with baton
    2. Runner 2 to the line
    3. Runner 1 Exchange with runner 2
    4. Runner 2 Running with baton
    5. Runner 3 to the line
    6. Runner 2 Exchange with runner 3
    7. Runner 3 Running with baton
    8. Runner 4 to the line
    9. Runner 3 Exchange with runner 4
    10. Runner 4 Running with baton
    11. Runner 4 finished, Race over

    有缓冲通道

    缓冲通道不强制发送和接收同时完成。

    当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通 道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道 里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标 志,就能得到通道的状态信息。

    模拟任务分发和处理

    1. package main
    2. import (
    3. "fmt"
    4. "math/rand"
    5. "sync"
    6. "time"
    7. )
    8. const (
    9. numberGoroutines = 4
    10. taskLoad = 10
    11. )
    12. var wg sync.WaitGroup
    13. func init() {
    14. rand.Seed(time.Now().UnixNano())
    15. }
    16. func main() {
    17. tasks := make(chan string, taskLoad)
    18. wg.Add(numberGoroutines)
    19. for gr := 1; gr <= numberGoroutines; gr++ {
    20. go worker(tasks, gr)
    21. }
    22. for post := 1; post <= taskLoad; post++ {
    23. tasks <- fmt.Sprintf("Task: %d", post)
    24. }
    25. close(tasks) // 任务发布完毕后关闭通道,关闭通道不影响其它Goroutine对已发布内容的正常接收
    26. wg.Wait()
    27. }
    28. func worker(tasks chan string, worker int) {
    29. defer wg.Done()
    30. for {
    31. task, ok := <-tasks
    32. if !ok {
    33. fmt.Printf("Worker: %d : Shutting down\n", worker)
    34. return
    35. }
    36. fmt.Printf("Worker: %d : Started %s\n", worker, task)
    37. sleep := rand.Int63n(100)
    38. time.Sleep(time.Duration(sleep) * time.Millisecond)
    39. fmt.Printf("Worker: %d : Completed %s\n", worker, task)
    40. }
    41. }
    1. Worker: 4 : Started Task: 4
    2. Worker: 2 : Started Task: 2
    3. Worker: 3 : Started Task: 3
    4. Worker: 1 : Started Task: 1
    5. Worker: 3 : Completed Task: 3
    6. Worker: 3 : Started Task: 5
    7. Worker: 2 : Completed Task: 2
    8. Worker: 2 : Started Task: 6
    9. Worker: 1 : Completed Task: 1
    10. Worker: 1 : Started Task: 7
    11. Worker: 1 : Completed Task: 7
    12. Worker: 1 : Started Task: 8
    13. Worker: 3 : Completed Task: 5
    14. Worker: 3 : Started Task: 9
    15. Worker: 4 : Completed Task: 4
    16. Worker: 4 : Started Task: 10
    17. Worker: 2 : Completed Task: 6
    18. Worker: 2 : Shutting down
    19. Worker: 3 : Completed Task: 9
    20. Worker: 3 : Shutting down
    21. Worker: 1 : Completed Task: 8
    22. Worker: 1 : Shutting down
    23. Worker: 4 : Completed Task: 10
    24. Worker: 4 : Shutting down

    runner

    runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以
    用runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。

    runner/runner.go

    1. package runner
    2. import (
    3. "errors"
    4. "os"
    5. "os/signal"
    6. "time"
    7. )
    8. type Runner struct {
    9. interrupt chan os.Signal
    10. complete chan error
    11. timeout <-chan time.Time // 单向通道,只允许接收
    12. tasks []func(int)
    13. }
    14. var ErrTimeout = errors.New("received timeout")
    15. var ErrInterrupt = errors.New("received interrupt")
    16. func New(d time.Duration) *Runner {
    17. return &Runner{
    18. interrupt: make(chan os.Signal, 1),
    19. complete: make(chan error),
    20. timeout: time.After(d),
    21. }
    22. }
    23. func (r *Runner) Add(tasks ...func(int)) {
    24. r.tasks = append(r.tasks, tasks...)
    25. }
    26. func (r *Runner) Start() error {
    27. signal.Notify(r.interrupt, os.Interrupt) // 如果有中断,会将中断信号发送到 r.interrupt
    28. go func() {
    29. r.complete <- r.run()
    30. }()
    31. select {
    32. case err := <-r.complete:
    33. return err // 如果提前中断,err 是 ErrInterrupt,正常结束则是 nil
    34. case <-r.timeout:
    35. return ErrTimeout
    36. }
    37. }
    38. func (r *Runner) run() error {
    39. for id, task := range r.tasks {
    40. if r.gotInterrupt() {
    41. return ErrInterrupt
    42. }
    43. task(id)
    44. }
    45. return nil
    46. }
    47. func (r *Runner) gotInterrupt() bool {
    48. select {
    49. case <-r.interrupt:
    50. signal.Stop(r.interrupt)
    51. return true
    52. default:
    53. return false
    54. }
    55. }

    main.go 

    1. package main
    2. import (
    3. "github.com/test/runner"
    4. "log"
    5. "os"
    6. "time"
    7. )
    8. const timeout = 3 * time.Second
    9. func main() {
    10. log.Println("Starting work.")
    11. r := runner.New(timeout)
    12. r.Add(createTask(), createTask(), createTask())
    13. if err := r.Start(); err != nil {
    14. switch err {
    15. case runner.ErrTimeout:
    16. log.Println("Terminating due to timeout.")
    17. os.Exit(1)
    18. case runner.ErrInterrupt:
    19. log.Println("Terminating due to interrupt.")
    20. os.Exit(2)
    21. }
    22. }
    23. log.Println("Process ended.")
    24. }
    25. func createTask() func(int) {
    26. return func(id int) {
    27. log.Printf("Processor - Task #%d.", id)
    28. time.Sleep(time.Duration(id) * time.Second)
    29. }
    30. }

    超时退出

    1. 2024/04/21 17:26:30 Starting work.
    2. 2024/04/21 17:26:30 Processor - Task #0.
    3. 2024/04/21 17:26:30 Processor - Task #1.
    4. 2024/04/21 17:26:31 Processor - Task #2.
    5. 2024/04/21 17:26:33 Terminating due to timeout.

    中断退出

    1. 2024/04/21 17:28:18 Starting work.
    2. 2024/04/21 17:28:18 Processor - Task #0.
    3. 2024/04/21 17:28:18 Processor - Task #1.
    4. 2024/04/21 17:28:19 Terminating due to interrupt.

    正常退出

    1. 2024/04/21 17:30:40 Starting work.
    2. 2024/04/21 17:30:40 Processor - Task #0.
    3. 2024/04/21 17:30:40 Processor - Task #1.
    4. 2024/04/21 17:30:41 Processor - Task #2.
    5. 2024/04/21 17:30:43 Process ended.

    pool

    pool 使用有缓冲通道实现资源池。

    pool/pool.go

    1. package pool
    2. import (
    3. "errors"
    4. "io"
    5. "log"
    6. "sync"
    7. )
    8. type Pool struct {
    9. m sync.Mutex
    10. resources chan io.Closer
    11. factory func() (io.Closer, error)
    12. closed bool
    13. }
    14. var ErrPoolClosed = errors.New("pool has been closed")
    15. func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    16. if size <= 0 {
    17. return nil, errors.New("size value too small")
    18. }
    19. return &Pool{
    20. resources: make(chan io.Closer, size),
    21. factory: fn,
    22. }, nil
    23. }
    24. func (p *Pool) Acquire() (io.Closer, error) {
    25. select {
    26. case r, ok := <-p.resources:
    27. log.Println("Acquire:", "Shared resource")
    28. if !ok {
    29. return nil, ErrPoolClosed
    30. }
    31. return r, nil
    32. default:
    33. log.Println("Acquire:", "New resource")
    34. return p.factory()
    35. }
    36. }
    37. func (p *Pool) Release(r io.Closer) {
    38. p.m.Lock()
    39. defer p.m.Unlock()
    40. if p.closed {
    41. r.Close()
    42. return
    43. }
    44. select {
    45. case p.resources <- r:
    46. log.Println("Release:", "In queue")
    47. default:
    48. log.Println("Release:", "Closing")
    49. r.Close()
    50. }
    51. }
    52. func (p *Pool) Close() {
    53. p.m.Lock()
    54. defer p.m.Unlock()
    55. if p.closed {
    56. return
    57. }
    58. p.closed = true
    59. close(p.resources)
    60. for r := range p.resources {
    61. r.Close()
    62. }
    63. }

    main.go

    1. package main
    2. import (
    3. "github.com/test/pool"
    4. "io"
    5. "log"
    6. "math/rand"
    7. "sync"
    8. "sync/atomic"
    9. "time"
    10. )
    11. const (
    12. maxGoroutines = 5
    13. pooledResources = 2
    14. )
    15. type dbConnection struct {
    16. ID int32
    17. }
    18. func (dbConn *dbConnection) Close() error {
    19. log.Println("Close: Connection", dbConn.ID)
    20. return nil
    21. }
    22. var idCounter int32
    23. func createConnection() (io.Closer, error) {
    24. id := atomic.AddInt32(&idCounter, 1)
    25. log.Println("Create: New Connection", id)
    26. return &dbConnection{id}, nil
    27. }
    28. func main() {
    29. var wg sync.WaitGroup
    30. wg.Add(maxGoroutines)
    31. p, err := pool.New(createConnection, pooledResources)
    32. if err != nil {
    33. log.Println(err)
    34. }
    35. for query := 0; query < maxGoroutines; query++ {
    36. go func(q int) {
    37. defer wg.Done()
    38. performQueries(q, p)
    39. }(query)
    40. }
    41. wg.Wait()
    42. log.Println("Shutdown program")
    43. p.Close()
    44. }
    45. func performQueries(query int, p *pool.Pool) {
    46. conn, err := p.Acquire()
    47. if err != nil {
    48. log.Println(err)
    49. return
    50. }
    51. defer p.Release(conn)
    52. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    53. log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
    54. }

    运行结果 

    1. 2024/04/21 20:29:20 Acquire: New resource
    2. 2024/04/21 20:29:20 Create: New Connection 1
    3. 2024/04/21 20:29:20 Acquire: New resource
    4. 2024/04/21 20:29:20 Acquire: New resource
    5. 2024/04/21 20:29:20 Create: New Connection 3
    6. 2024/04/21 20:29:20 Acquire: New resource
    7. 2024/04/21 20:29:20 Create: New Connection 4
    8. 2024/04/21 20:29:20 Create: New Connection 2
    9. 2024/04/21 20:29:20 Acquire: New resource
    10. 2024/04/21 20:29:20 Create: New Connection 5
    11. 2024/04/21 20:29:20 QID[0] CID[2]
    12. 2024/04/21 20:29:20 Release: In queue
    13. 2024/04/21 20:29:20 QID[2] CID[5]
    14. 2024/04/21 20:29:20 Release: In queue
    15. 2024/04/21 20:29:20 QID[4] CID[1]
    16. 2024/04/21 20:29:20 Release: Closing
    17. 2024/04/21 20:29:20 Close: Connection 1
    18. 2024/04/21 20:29:21 QID[1] CID[4]
    19. 2024/04/21 20:29:21 Release: Closing
    20. 2024/04/21 20:29:21 Close: Connection 4
    21. 2024/04/21 20:29:21 QID[3] CID[3]
    22. 2024/04/21 20:29:21 Release: Closing
    23. 2024/04/21 20:29:21 Close: Connection 3
    24. 2024/04/21 20:29:21 Shutdown program
    25. 2024/04/21 20:29:21 Close: Connection 2
    26. 2024/04/21 20:29:21 Close: Connection 5

    work

    work 使用无缓冲通道创建资源池。work 新建时就生成指定个数 goroutine 循环等待(无缓冲通道无数据阻塞)消费任务,然后主线程再分发相应任务到通道,消费 goroutine 再继续执行消费任务。

    work/work.go

    1. package work
    2. import "sync"
    3. type Worker interface {
    4. Task()
    5. }
    6. type Pool struct {
    7. work chan Worker
    8. wg sync.WaitGroup
    9. }
    10. func New(maxGoroutines int) *Pool {
    11. p := Pool{
    12. work: make(chan Worker),
    13. }
    14. p.wg.Add(maxGoroutines)
    15. for i := 0; i < maxGoroutines; i++ {
    16. go func() {
    17. defer p.wg.Done()
    18. for w := range p.work { // 如果 work 关闭,for range 循环结束
    19. w.Task()
    20. }
    21. }()
    22. }
    23. return &p
    24. }
    25. func (p *Pool) Run(w Worker) {
    26. p.work <- w
    27. }
    28. func (p *Pool) Shutdown() {
    29. close(p.work) // 关闭通道,避免 New 产生的 goroutine 一直阻塞不退出
    30. p.wg.Wait()
    31. }

    main.go

    1. package main
    2. import (
    3. "github.com/test/work"
    4. "log"
    5. "sync"
    6. "time"
    7. )
    8. var names = []string{
    9. "steve",
    10. "jason",
    11. "looking",
    12. }
    13. type namePrinter struct {
    14. name string
    15. }
    16. func (m *namePrinter) Task() {
    17. log.Println(m.name)
    18. time.Sleep(time.Second)
    19. }
    20. const times = 3
    21. func main() {
    22. p := work.New(2)
    23. var wg sync.WaitGroup
    24. wg.Add(times * len(names))
    25. for i := 0; i < times; i++ {
    26. for _, name := range names {
    27. np := namePrinter{name: name}
    28. go func() {
    29. defer wg.Done()
    30. p.Run(&np)
    31. }()
    32. }
    33. }
    34. wg.Wait()
    35. p.Shutdown()
    36. }

    运行结果

    1. 2024/04/21 21:39:28 steve
    2. 2024/04/21 21:39:28 looking
    3. 2024/04/21 21:39:29 jason
    4. 2024/04/21 21:39:29 looking
    5. 2024/04/21 21:39:30 steve
    6. 2024/04/21 21:39:30 jason
    7. 2024/04/21 21:39:31 looking
    8. 2024/04/21 21:39:31 steve
    9. 2024/04/21 21:39:32 jason
  • 相关阅读:
    XML序列化与反序列化接口对接实战,看这篇就够了
    Python Http请求和HTML的解析
    立哥国家示范项目-5G智慧文旅
    什么是war包?war包该怎么运行?
    线上频繁fullgc问题-SpringActuator的坑
    全网最简约的Vscode配置Anaconda环境(百分百成功)
    Vue3引入pinia并模块化
    Java 去除字符中的空格、回车符
    ArangoDB 学习笔记(一)
    数据库索引原理
  • 原文地址:https://blog.csdn.net/TomorrowAndTuture/article/details/138031256