• Golang基础7-并发编程


    并发编程

    https://www.cnblogs.com/Survivalist/p/11527949.html

    进程和线程、协程的区别_线程协程进程的区别-CSDN博客

    Golang中的并发编程是一个重点,我们要了解Golang中的并发Goroutine因此需要先理解进程、线程、之后再理解协程。

    进程操作系统进行资源分配的最小单元,是程序在执行过程中的一次活动,包括程序,数据集合,程序控制块(PCB)等。每个进程都有独立的内存空间,包括代码、数据、堆栈,因此进程之间相互隔离。进程切换开销大(包括栈、寄存器、页表、文件句柄等切换)。尽管更安全,但也占据了较多系统资源

    线程:操作系统进行调度的最小单元,是进程中执行的基本单元,由线程ID,当前指令指针PC,寄存器和堆栈组成。一个进程包含>=1个线程,其中一个为主线程,多个线程时通过共享内存,上下文切换较快,资源开销较小。共享内存尤其需要注意线程同步互斥问题。

    协程:用户轻量级线程,由程序控制,不被操作系统内核管理

    更轻量,独立栈空间(协程之间不共享内存,但是可以通信(channel)进行交互),更易并发(高效切换无需过多锁)

    并行和并发区别

    goroutine

    参考文章:Goroutine · Golang 学习笔记

    每个goroutine是官方实现的超级"线程池"

    每个实例4-5KB栈内存和实现机制大幅减少创建和销毁使得go更易实现高并发

    goroutine奉行通信(配合channel)实现共享内存。

    在go语言层面内置调度和上下文切换机制,并且go程序会智能地将任务合理的分配给CPU

    简单demo
    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. func Hello() {
    7. fmt.Println("Hello Function!")
    8. }
    9. func main() {
    10. //在函数前加入关键字go
    11. go Hello()
    12. fmt.Println("Main done")
    13. //休眠,等go Hello()执行完
    14. time.Sleep(time.Second)
    15. }

    sync.WaitGroup demo
    1. WaitGroup用来启动一组goroutine,等待任务做完再结束goroutine。
    2. wg.Add(delta int):设置将要启动的Goroutine的数量,来设置WaitGroup内部计数器
    3. wg.Done():每个goroutine完成后,计数器-1 ;对于可能panic的可以使用defer wg.Done()
    4. wg.Wait():阻塞自己,等待所有goroutine完成任务,计数器减为0,返回

    sync.WaitGroup中的Add和Done线程安全,可以从多个groutine中调用这两个方法,不用担心数据竞争和其他并发问题。

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. //启动多个goroutine
    7. func main() {
    8. //协程同步
    9. var wg sync.WaitGroup
    10. wg.Add(9)
    11. for i := 0; i < 9; i++ {
    12. //当作参数传入会拷贝一份,因此可以保证输出0-8
    13. go func(i int) {
    14. defer wg.Done()
    15. fmt.Printf("%d ",i)
    16. }(i)
    17. }
    18. // 阻塞主程序,等待所有 Goroutine 完成
    19. wg.Wait()
    20. }

    输出结构并发乱序。0-9的其中一个组合

    sync.Map demo
      1. sync.Map并发安全的sync.Map,可以安全并发的读写操作,常见操作见代码
      2. 与之相对应的原生map,线程不安全,并发读写时需要加锁
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. func main() {
    7. //sync.Map的keyvalue都是interface{}
    8. var m sync.Map
    9. //写入
    10. m.Store("1", 18)
    11. m.Store("2", 20)
    12. //读取
    13. age, ok := m.Load("1")
    14. if ok {
    15. fmt.Println("读取成功", age, ok)
    16. } else {
    17. fmt.Println("读取失败!")
    18. }
    19. //遍历!!
    20. m.Range(func(key, value interface{}) bool {
    21. fmt.Println("遍历:key=", key, " value=", value)
    22. return true
    23. })
    24. //根据key删除
    25. m.Delete("2")
    26. age, ok = m.Load("2")
    27. if ok {
    28. fmt.Println("删除后读取成功", age, ok)
    29. } else {
    30. fmt.Println("删除后读取失败!")
    31. }
    32. //存在则读取否则写入
    33. //如果存在key=2,ok返回为true,否则false
    34. age, ok = m.LoadOrStore("2", "100")
    35. if ok {
    36. fmt.Println("已存在的:", age)
    37. } else {
    38. fmt.Println("不存在,store后的:", age)
    39. }
    40. }

    map并发 demo
      • 原生map实现并发时一定需要加锁来保证安全,不然报错。
      • sync.Map安全Map,不需要上锁解锁操作。
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. func main() {
    7. //没有加锁的并发写入,则会报错
    8. m := make(map[int]int)
    9. var wg sync.WaitGroup
    10. var mu sync.Mutex
    11. for i := 0; i < 9; i++ {
    12. wg.Add(1)
    13. go func(i int) {
    14. for j := 0; j < 9; j++ {
    15. //上锁
    16. mu.Lock()
    17. m[j] = i
    18. mu.Unlock()
    19. }
    20. wg.Done()
    21. }(i)
    22. }
    23. // 安全Map
    24. var sm sync.Map
    25. for i := 0; i < 9; i++ {
    26. wg.Add(1)
    27. go func(i int) {
    28. for j := 0; j < 9; j++ {
    29. sm.Store(j, i)
    30. }
    31. wg.Done()
    32. }(i)
    33. }
    34. //完成前面并发任务后输出
    35. wg.Wait()
    36. fmt.Println("最终打印map值:", m)
    37. fmt.Print("最终打印sync.Map值:")
    38. sm.Range(func(key, value interface{}) bool {
    39. fmt.Printf("%d:%d ", key, value)
    40. return true
    41. })
    42. }

    go的GMP调度原理

    channel

    go中不要通过共享内存来通信,而是通过通信来共享内存。

    参考:Go Channel 详解

    go高性能编程:GitHub - wuqinqiang/Go_Concurrency: go concurrency class code

    go语言核心类型,管道,并发中可以进行发送或接收数据进行通信。

    <-

    使用make创建channel,chan底层是一个环形数组

    类型:chan chan <- <-chan

    使用场景:

      • 消息传递、消息过滤
      • 信号广播
      • 事件的订阅和广播
      • 任务分发
      • 结果汇总
      • 并发控制
      • 同步和异步

    简单demo

    无缓冲channel(B要第一时间知道A是否完成)、有缓冲channel(生产者消费者模型)

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. func main() {
    7. c := make(chan int, 2)
    8. defer close(c)
    9. var wg sync.WaitGroup
    10. wg.Add(1)
    11. go func() {
    12. defer wg.Done()
    13. c <- 3 + 4
    14. c <- 1
    15. fmt.Println("发送成功")
    16. }()
    17. wg.Add(1)
    18. go func() {
    19. defer wg.Done()
    20. c <- 100
    21. }()
    22. //wg.Wait()
    23. i := <-c
    24. j := <-c
    25. _ = <-c//忽略这个值
    26. fmt.Println(i, j)
    27. wg.Wait()
    28. }

    1. import "fmt"
    2. func sum(s []int, c chan int) {
    3. sum := 0
    4. for _, v := range s {
    5. sum += v
    6. }
    7. c <- sum // send sum to c
    8. }
    9. func main() {
    10. s := []int{7, 2, 8, -9, 4, 0}
    11. c := make(chan int)
    12. go sum(s[:len(s)/2], c)
    13. go sum(s[len(s)/2:], c)
    14. x, y := <-c, <-c // receive from c
    15. fmt.Println(x, y, x+y)
    16. }

    channel的range

    1. func main() {
    2. go func() {
    3. time.Sleep(1 * time.Hour)
    4. }()
    5. c := make(chan int)
    6. go func() {
    7. for i := 0; i < 10; i = i + 1 {
    8. c <- i
    9. }
    10. close(c)
    11. }()
    12. for i := range c {
    13. fmt.Println(i)
    14. }
    15. fmt.Println("Finished")
    16. }

    这个range会一直从c中获取,直到c关闭

    select demo

    类似与linux中io的select、poll、epoll。

    select语句类似于switch,随机执行一个可执行的case,select只用于通信操作,如果没有case可运行那么将阻塞,直到有case可运行。默认的字句总是可以运行。

      • 每个case都必须是一个通信
      • 所有channel表达式都会被求值
      • 所有被发送的表达式都会被求值
      • 如果任意某个通信可以进行,它就执行;其他被忽略。
      • 如果有多个case都可以运行,Select会随机地选出一个执行。其他不会执行。
      • 否则:如果有default子句,则执行该语句。
      • 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
    1. package main
    2. import "fmt"
    3. //select用于退出
    4. func fibonacci(c, quit chan int) {
    5. x, y := 0, 1
    6. for {
    7. select {
    8. case c <- x:
    9. x, y = y, x+y
    10. case <-quit:
    11. fmt.Println("quit")
    12. return
    13. }
    14. }
    15. }
    16. func main() {
    17. c := make(chan int)
    18. quit := make(chan int)
    19. go func() {
    20. for i := 0; i < 10; i++ {
    21. fmt.Println(<-c)
    22. }
    23. quit <- 0
    24. }()
    25. fibonacci(c, quit)
    26. }
    timeout demo
    1. package main
    2. import "time"
    3. import "fmt"
    4. func main() {
    5. c1 := make(chan string, 1)
    6. go func() {
    7. time.Sleep(time.Second * 2)
    8. c1 <- "result 1"
    9. }()
    10. select {
    11. case res := <-c1:
    12. fmt.Println(res)
    13. //超时退出
    14. case <-time.After(time.Second * 1):
    15. fmt.Println("timeout 1")
    16. }
    17. }
    单向通道 demo

    send chan <- string//只能发送给send

    read <-chan string// 只能读取read

    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. func Produce(out chan<- int) {
    7. for i := 0; i < 10; i++ {
    8. out <- i * i
    9. }
    10. }
    11. func Consumer(in <-chan int) {
    12. for num := range in {
    13. fmt.Println(num)
    14. }
    15. }
    16. func main() {
    17. c := make(chan int, 0)
    18. go Produce(c)
    19. go Consumer(c)
    20. time.Sleep(time.Second)
    21. }

    1. package main
    2. import "fmt"
    3. // 只能发送给管道
    4. func Counter(out chan<- int) {
    5. for i := 0; i < 10; i++ {
    6. out <- i
    7. }
    8. close(out)
    9. }
    10. // chan <- 只能发送给管道 <-chan 管道发送嘛,因此只能接收
    11. func Squarer(out chan<- int, in <-chan int) {
    12. for i := range in {
    13. out <- i * i
    14. }
    15. close(out)
    16. }
    17. func Printer(in <-chan int) {
    18. for i := range in {
    19. fmt.Println(i)
    20. }
    21. }
    22. func main() {
    23. ch1 := make(chan int)
    24. ch2 := make(chan int)
    25. go Counter(ch1)
    26. go Squarer(ch2, ch1)
    27. Printer(ch2)
    28. }

    输出0-9的平方。

    协程池demo

    Golang学习篇——协程池_golang 携程池-CSDN博客

    1. package main
    2. import (
    3. "fmt"
    4. "math/rand"
    5. "sync"
    6. )
    7. // 当前task
    8. type Task struct {
    9. Id int
    10. Random int
    11. }
    12. // 结果
    13. type Result struct {
    14. Task *Task
    15. Sum int
    16. }
    17. // 创建Task
    18. func CreateTask(taskChan chan<- *Task, wg *sync.WaitGroup) {
    19. defer wg.Done()
    20. for id := 0; id < 100000; id++ {
    21. //创建Task
    22. task := &Task{
    23. Id: id,
    24. Random: rand.Intn(200) + 1,
    25. }
    26. // 传递给taskChan管道
    27. taskChan <- task
    28. }
    29. close(taskChan)
    30. }
    31. // 创建线程池来处理
    32. func CreatePool(num int, taskChan <-chan *Task, resultChan chan<- *Result, wg *sync.WaitGroup) {
    33. for i := 0; i < num; i++ {
    34. wg.Add(1)
    35. // 创建多个goroutine并发
    36. go func() {
    37. for task := range taskChan {
    38. // 当前的Num
    39. currentNum := task.Random
    40. sum := 0
    41. // 计算sum的值
    42. for currentNum != 0 {
    43. temp := currentNum % 10
    44. sum += temp
    45. currentNum /= 10
    46. }
    47. // 此时任务的结果是:
    48. currentResult := &Result{
    49. Task: task,
    50. Sum: sum,
    51. }
    52. // 发送给结果管道
    53. resultChan <- currentResult
    54. }
    55. wg.Done()
    56. }()
    57. }
    58. }
    59. // 开启打印 Result
    60. func PrintResult(resultChan <-chan *Result) {
    61. //输出
    62. for res := range resultChan {
    63. fmt.Printf("输出结果,Id:=%d,Random:=%d,Sum:=%d\n", res.Task.Id, res.Task.Random, res.Sum)
    64. }
    65. }
    66. func main() {
    67. // 创建task管道,传递task
    68. taskChan := make(chan *Task, 128)
    69. // 结果管道
    70. resultChan := make(chan *Result, 128)
    71. // 确保goroutine全部完成
    72. var wg sync.WaitGroup
    73. wg.Add(1)
    74. go CreateTask(taskChan, &wg)
    75. // 创建协程池
    76. CreatePool(133, taskChan, resultChan, &wg)
    77. go func() {
    78. wg.Wait()
    79. close(resultChan)
    80. }()
    81. // 创建协程进行打印
    82. PrintResult(resultChan)
    83. }

    channel一定注意防止被阻塞而导致程序出现死锁!!!

    并发安全和锁

    互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。sync.Mutex

    sync.Mutex互斥锁demo

    多个goroutine对同一个共享资源(当前的x)的竞争你,x=x+1,在汇编当中并不是原子性的操作,因此并发时会导致数据不一致,方法1,上锁。

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "sync/atomic"
    6. )
    7. var (
    8. total int32
    9. wg sync.WaitGroup
    10. mutex sync.Mutex
    11. )
    12. func Add() {
    13. defer wg.Done()
    14. for i := 0; i < 10000; i++ {
    15. //原子操作
    16. atomic.AddInt32(&total, 1)
    17. //mutex.Lock()
    18. //total++
    19. //mutex.Unlock()
    20. }
    21. }
    22. func Del() {
    23. defer wg.Done()
    24. for i := 0; i < 10000; i++ {
    25. atomic.AddInt32(&total, -1)
    26. //mutex.Lock()
    27. //total--
    28. //mutex.Unlock()
    29. }
    30. }
    31. func main() {
    32. fmt.Println("origin num:", total)
    33. wg.Add(2)
    34. go Add()
    35. go Del()
    36. wg.Wait()
    37. fmt.Println("After num:", total)
    38. }
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. var x int64
    7. var wg sync.WaitGroup
    8. var lock sync.Mutex
    9. func Add() {
    10. for i := 0; i < 50; i++ {
    11. lock.Lock()
    12. x = x + 1
    13. lock.Unlock()
    14. }
    15. wg.Done()
    16. }
    17. func main() {
    18. wg.Add(2)
    19. go Add()
    20. go Add()
    21. wg.Wait()
    22. fmt.Println(x)
    23. }

    读写互斥锁 demo
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var (
    8. x int64
    9. wg sync.WaitGroup
    10. lock sync.Mutex
    11. rwlock sync.RWMutex //读写互斥锁
    12. )
    13. func Write() {
    14. //lock.Lock() //加互斥锁
    15. rwlock.Lock()
    16. x = x + 1
    17. time.Sleep(10 * time.Millisecond)
    18. rwlock.Unlock()
    19. //lock.Unlock() //解互斥锁
    20. wg.Done()
    21. }
    22. func Read() {
    23. //lock.Lock()
    24. rwlock.RLock()
    25. time.Sleep(time.Millisecond)
    26. rwlock.RUnlock()
    27. //lock.Unlock()
    28. wg.Done()
    29. }
    30. func main() {
    31. start := time.Now()
    32. for i := 0; i < 10; i++ {
    33. wg.Add(1)
    34. go Write()
    35. }
    36. for i := 0; i < 1000; i++ {
    37. wg.Add(1)
    38. go Read()
    39. }
    40. wg.Wait()
    41. end := time.Now()
    42. fmt.Println(end.Sub(start))
    43. }

    sync

    前面介绍过sync的一些方法

    sync.WaitGroup

    sync.Once

    参考:Go sync.Once | Go 语言高性能编程 | 极客兔兔

    执行一次的函数,可以在代码任意位置加载,常用于单例模式(懒汉式),并发场景安全。而init是package首次执行时加载(饿汉式)

    对外接口:func (o *Once) Do(f func())

    sync.Map

    这个是并发安全的Map

    原子操作

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "sync/atomic"
    6. "time"
    7. )
    8. var x int64
    9. var l sync.Mutex
    10. var wg sync.WaitGroup
    11. // 普通版加函数
    12. func add() {
    13. // x = x + 1
    14. x++ // 等价于上面的操作
    15. wg.Done()
    16. }
    17. // 互斥锁版加函数
    18. func mutexAdd() {
    19. l.Lock()
    20. x++
    21. l.Unlock()
    22. wg.Done()
    23. }
    24. // 原子操作版加函数
    25. func atomicAdd() {
    26. atomic.AddInt64(&x, 1)
    27. wg.Done()
    28. }
    29. func main() {
    30. start := time.Now()
    31. for i := 0; i < 10000; i++ {
    32. wg.Add(1)
    33. //go add() // 普通版add函数 不是并发安全的
    34. //go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
    35. go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
    36. }
    37. wg.Wait()
    38. end := time.Now()
    39. fmt.Println(x)
    40. fmt.Println(end.Sub(start))
    41. }

    Context

    context详解:https://www.cnblogs.com/juanmaofeifei/p/14439957.html

    Go 语言并发编程与 Context | Go 语言设计与实现

    Context是用来用来处理goroutine,可以在多个goroutine中传递取消信号、超时等。

    通俗的解释:Context · Go语言中文文档

    由于golang的server在goroutine当中,context就是有效管理这些goroutine,相互调用的goroutine之间通过传递context变量保持关联,这样在不用暴露各goroutine内部实现细节的前提下,有效地控制各goroutine的运行。

    引入--退出goroutine
    方式1,采用全局变量
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var wg sync.WaitGroup
    8. // 退出全局变量
    9. var stop bool
    10. func worker() {
    11. defer wg.Done()
    12. for {
    13. if stop {
    14. break
    15. }
    16. time.Sleep(time.Second)
    17. fmt.Println("worker")
    18. }
    19. }
    20. func main() {
    21. wg.Add(1)
    22. go worker()
    23. time.Sleep(3 * time.Second)
    24. stop = true
    25. wg.Wait()
    26. }
    方式2,采用管道通信
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var wg sync.WaitGroup
    8. var ch = make(chan struct{})
    9. func worker() {
    10. defer wg.Done()
    11. LOOP:
    12. for {
    13. select {
    14. case <-ch:
    15. fmt.Println("exit")
    16. break LOOP
    17. default:
    18. time.Sleep(time.Second)
    19. fmt.Println("worker")
    20. }
    21. }
    22. }
    23. func main() {
    24. wg.Add(1)
    25. go worker()
    26. time.Sleep(3 * time.Second)
    27. ch <- struct{}{}
    28. wg.Wait()
    29. }
    方式3,采用context
    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "sync"
    6. "time"
    7. )
    8. var wg sync.WaitGroup
    9. func worker(ctx context.Context) {
    10. defer wg.Done()
    11. LOOP:
    12. for {
    13. select {
    14. case <-ctx.Done():
    15. fmt.Println("exit")
    16. break LOOP
    17. default:
    18. time.Sleep(time.Second)
    19. fmt.Println("worker")
    20. }
    21. }
    22. }
    23. func main() {
    24. wg.Add(1)
    25. ctx, cancel := context.WithCancel(context.Background())
    26. go worker(ctx)
    27. time.Sleep(3 * time.Second)
    28. cancel() //等待子routine结束
    29. wg.Wait()
    30. }

    如果函数当中需要被控制、超时、传递时,但不希望改变原来的接口时,函数第一个参数传入ctx。

    context

    1. type Context interface {
    2. Deadline() (deadline time.Time, ok bool)
    3. Done() <-chan struct{}
    4. Err() error
    5. Value(key interface{}) interface{}
    6. }

    WithDeadline
    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "time"
    6. )
    7. func main() {
    8. d := time.Now().Add(50 * time.Millisecond)
    9. ctx, canel := context.WithDeadline(context.Background(), d)
    10. // 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
    11. // 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    12. defer canel()
    13. select {
    14. case <-time.After(10 * time.Millisecond):
    15. fmt.Println("overslept")
    16. case <-ctx.Done():
    17. fmt.Println(ctx.Err())
    18. }
    19. }

    WithTimeout
    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "sync"
    6. "time"
    7. )
    8. var wg sync.WaitGroup
    9. func worker(ctx context.Context) {
    10. defer wg.Done()
    11. LOOP:
    12. for {
    13. select {
    14. case <-ctx.Done():
    15. fmt.Println("exit")
    16. break LOOP
    17. default:
    18. time.Sleep(time.Second)
    19. fmt.Println("worker")
    20. }
    21. }
    22. }
    23. func main() {
    24. wg.Add(1)
    25. //超时控制
    26. ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
    27. go worker(ctx)
    28. time.Sleep(3 * time.Second)
    29. wg.Wait()
    30. }
    WithValue
    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "sync"
    6. "time"
    7. )
    8. var wg sync.WaitGroup
    9. func worker(ctx context.Context) {
    10. //拿到keyvalue
    11. fmt.Printf("traceid:%s\n", ctx.Value("traceid"))
    12. //记录一些日志等等,方便排查
    13. defer wg.Done()
    14. LOOP:
    15. for {
    16. select {
    17. case <-ctx.Done():
    18. fmt.Println("exit")
    19. break LOOP
    20. default:
    21. time.Sleep(time.Second)
    22. fmt.Println("worker")
    23. }
    24. }
    25. }
    26. func main() {
    27. wg.Add(1)
    28. //超时控制
    29. ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
    30. //传递一些值,后续可能链路追踪id
    31. childCtx := context.WithValue(ctx, "traceid", "123456")
    32. go worker(childCtx)
    33. time.Sleep(3 * time.Second)
    34. wg.Wait()
    35. }
  • 相关阅读:
    vue3 解决警告: Promise returned from xxx is ignored 和 $router未定义
    JavaFX Scene Builder 3D 控件详解
    o(1)复杂度找出栈中最小的元素
    容器类之QT
    PostgreSQL 15新版本特性解读(含直播问答、PPT资料汇总)
    JavaScriput中数组去重方法之indexof()
    TAMRA-NHS 荧光素-活性酯
    基于ASP.NET的驾校管理系统设计与实现
    智慧农业数字孪生应用案例,数字化农业建设发展现状
    简单的几个递归小算法
  • 原文地址:https://blog.csdn.net/m0_61973596/article/details/138185065