https://www.cnblogs.com/Survivalist/p/11527949.html
Golang中的并发编程是一个重点,我们要了解Golang中的并发Goroutine因此需要先理解进程、线程、之后再理解协程。
进程:操作系统进行资源分配的最小单元,是程序在执行过程中的一次活动,包括程序,数据集合,程序控制块(PCB)等。每个进程都有独立的内存空间,包括代码、数据、堆栈,因此进程之间相互隔离。进程切换开销大(包括栈、寄存器、页表、文件句柄等切换)。尽管更安全,但也占据了较多系统资源
线程:操作系统进行调度的最小单元,是进程中执行的基本单元,由线程ID,当前指令指针PC,寄存器和堆栈组成。一个进程包含>=1个线程,其中一个为主线程,多个线程时通过共享内存,上下文切换较快,资源开销较小。共享内存尤其需要注意线程同步互斥问题。

协程:用户轻量级线程,由程序控制,不被操作系统内核管理
更轻量,独立栈空间(协程之间不共享内存,但是可以通信(channel)进行交互),更易并发(高效切换无需过多锁)

并行和并发区别

每个goroutine是官方实现的超级"线程池"
每个实例4-5KB栈内存和实现机制大幅减少创建和销毁使得go更易实现高并发
goroutine奉行通信(配合channel)实现共享内存。
在go语言层面内置调度和上下文切换机制,并且go程序会智能地将任务合理的分配给CPU
- package main
-
- import (
- "fmt"
- "time"
- )
-
- func Hello() {
- fmt.Println("Hello Function!")
- }
-
- func main() {
- //在函数前加入关键字go
- go Hello()
- fmt.Println("Main done")
- //休眠,等go Hello()执行完
- time.Sleep(time.Second)
- }

sync.WaitGroup中的Add和Done线程安全,可以从多个groutine中调用这两个方法,不用担心数据竞争和其他并发问题。
- package main
-
- import (
- "fmt"
- "sync"
- )
- //启动多个goroutine
- func main() {
- //协程同步
- var wg sync.WaitGroup
- wg.Add(9)
-
- for i := 0; i < 9; i++ {
- //当作参数传入会拷贝一份,因此可以保证输出0-8
- go func(i int) {
- defer wg.Done()
- fmt.Printf("%d ",i)
- }(i)
- }
- // 阻塞主程序,等待所有 Goroutine 完成
- wg.Wait()
- }
输出结构并发乱序。0-9的其中一个组合

- package main
-
- import (
- "fmt"
- "sync"
- )
-
- func main() {
- //sync.Map的key和value都是interface{}
- var m sync.Map
-
- //写入
- m.Store("1", 18)
- m.Store("2", 20)
-
- //读取
- age, ok := m.Load("1")
- if ok {
- fmt.Println("读取成功", age, ok)
- } else {
- fmt.Println("读取失败!")
- }
-
- //遍历!!
- m.Range(func(key, value interface{}) bool {
- fmt.Println("遍历:key=", key, " value=", value)
- return true
- })
-
- //根据key删除
- m.Delete("2")
- age, ok = m.Load("2")
- if ok {
- fmt.Println("删除后读取成功", age, ok)
- } else {
- fmt.Println("删除后读取失败!")
- }
-
- //存在则读取否则写入
- //如果存在key=2,ok返回为true,否则false
- age, ok = m.LoadOrStore("2", "100")
- if ok {
- fmt.Println("已存在的:", age)
- } else {
- fmt.Println("不存在,store后的:", age)
- }
-
- }

- package main
-
- import (
- "fmt"
- "sync"
- )
-
- func main() {
- //没有加锁的并发写入,则会报错
- m := make(map[int]int)
-
- var wg sync.WaitGroup
- var mu sync.Mutex
-
- for i := 0; i < 9; i++ {
- wg.Add(1)
-
- go func(i int) {
- for j := 0; j < 9; j++ {
- //上锁
- mu.Lock()
- m[j] = i
- mu.Unlock()
- }
- wg.Done()
- }(i)
-
- }
-
-
- // 安全Map
- var sm sync.Map
- for i := 0; i < 9; i++ {
- wg.Add(1)
- go func(i int) {
- for j := 0; j < 9; j++ {
- sm.Store(j, i)
- }
- wg.Done()
- }(i)
-
- }
- //完成前面并发任务后输出
- wg.Wait()
-
- fmt.Println("最终打印map值:", m)
- fmt.Print("最终打印sync.Map值:")
- sm.Range(func(key, value interface{}) bool {
- fmt.Printf("%d:%d ", key, value)
- return true
- })
-
- }
go中不要通过共享内存来通信,而是通过通信来共享内存。
go高性能编程:GitHub - wuqinqiang/Go_Concurrency: go concurrency class code
go语言核心类型,管道,并发中可以进行发送或接收数据进行通信。
<-
使用make创建channel,chan底层是一个环形数组
类型:chan chan <- <-chan
使用场景:
无缓冲channel(B要第一时间知道A是否完成)、有缓冲channel(生产者消费者模型)
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- func main() {
- c := make(chan int, 2)
- defer close(c)
-
- var wg sync.WaitGroup
-
- wg.Add(1)
-
- go func() {
- defer wg.Done()
- c <- 3 + 4
- c <- 1
- fmt.Println("发送成功")
- }()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- c <- 100
- }()
-
- //wg.Wait()
- i := <-c
- j := <-c
- _ = <-c//忽略这个值
- fmt.Println(i, j)
-
- wg.Wait()
-
- }

- import "fmt"
- func sum(s []int, c chan int) {
- sum := 0
- for _, v := range s {
- sum += v
- }
- c <- sum // send sum to c
- }
- func main() {
- s := []int{7, 2, 8, -9, 4, 0}
- c := make(chan int)
- go sum(s[:len(s)/2], c)
- go sum(s[len(s)/2:], c)
- x, y := <-c, <-c // receive from c
- fmt.Println(x, y, x+y)
- }
channel的range
- func main() {
- go func() {
- time.Sleep(1 * time.Hour)
- }()
- c := make(chan int)
- go func() {
- for i := 0; i < 10; i = i + 1 {
- c <- i
- }
- close(c)
- }()
- for i := range c {
- fmt.Println(i)
- }
- fmt.Println("Finished")
- }
这个range会一直从c中获取,直到c关闭
类似与linux中io的select、poll、epoll。
select语句类似于switch,随机执行一个可执行的case,select只用于通信操作,如果没有case可运行那么将阻塞,直到有case可运行。默认的字句总是可以运行。
- package main
-
- import "fmt"
-
- //select用于退出
- func fibonacci(c, quit chan int) {
- x, y := 0, 1
- for {
- select {
- case c <- x:
- x, y = y, x+y
- case <-quit:
- fmt.Println("quit")
- return
- }
- }
- }
-
- func main() {
- c := make(chan int)
- quit := make(chan int)
- go func() {
- for i := 0; i < 10; i++ {
- fmt.Println(<-c)
- }
- quit <- 0
- }()
- fibonacci(c, quit)
- }
- package main
-
- import "time"
- import "fmt"
-
- func main() {
- c1 := make(chan string, 1)
- go func() {
- time.Sleep(time.Second * 2)
- c1 <- "result 1"
- }()
- select {
- case res := <-c1:
- fmt.Println(res)
- //超时退出
- case <-time.After(time.Second * 1):
- fmt.Println("timeout 1")
- }
- }
send chan <- string//只能发送给send
read <-chan string// 只能读取read
- package main
-
- import (
- "fmt"
- "time"
- )
-
- func Produce(out chan<- int) {
- for i := 0; i < 10; i++ {
- out <- i * i
- }
-
- }
-
- func Consumer(in <-chan int) {
- for num := range in {
- fmt.Println(num)
- }
- }
-
- func main() {
- c := make(chan int, 0)
-
- go Produce(c)
- go Consumer(c)
-
- time.Sleep(time.Second)
- }
- package main
-
- import "fmt"
-
- // 只能发送给管道
- func Counter(out chan<- int) {
- for i := 0; i < 10; i++ {
- out <- i
- }
- close(out)
- }
-
- // chan <- 只能发送给管道 <-chan 管道发送嘛,因此只能接收
- func Squarer(out chan<- int, in <-chan int) {
- for i := range in {
- out <- i * i
- }
- close(out)
- }
-
- func Printer(in <-chan int) {
- for i := range in {
- fmt.Println(i)
- }
- }
-
- func main() {
- ch1 := make(chan int)
- ch2 := make(chan int)
- go Counter(ch1)
- go Squarer(ch2, ch1)
- Printer(ch2)
- }
输出0-9的平方。
Golang学习篇——协程池_golang 携程池-CSDN博客
- package main
-
- import (
- "fmt"
- "math/rand"
- "sync"
- )
-
- // 当前task
- type Task struct {
- Id int
- Random int
- }
-
- // 结果
- type Result struct {
- Task *Task
- Sum int
- }
-
- // 创建Task
- func CreateTask(taskChan chan<- *Task, wg *sync.WaitGroup) {
- defer wg.Done()
- for id := 0; id < 100000; id++ {
- //创建Task
- task := &Task{
- Id: id,
- Random: rand.Intn(200) + 1,
- }
- // 传递给taskChan管道
- taskChan <- task
- }
- close(taskChan)
-
- }
-
- // 创建线程池来处理
- func CreatePool(num int, taskChan <-chan *Task, resultChan chan<- *Result, wg *sync.WaitGroup) {
- for i := 0; i < num; i++ {
- wg.Add(1)
- // 创建多个goroutine并发
- go func() {
- for task := range taskChan {
- // 当前的Num
- currentNum := task.Random
- sum := 0
- // 计算sum的值
- for currentNum != 0 {
- temp := currentNum % 10
- sum += temp
- currentNum /= 10
- }
- // 此时任务的结果是:
- currentResult := &Result{
- Task: task,
- Sum: sum,
- }
- // 发送给结果管道
- resultChan <- currentResult
- }
- wg.Done()
- }()
- }
-
- }
-
- // 开启打印 Result
- func PrintResult(resultChan <-chan *Result) {
- //输出
- for res := range resultChan {
- fmt.Printf("输出结果,Id:=%d,Random:=%d,Sum:=%d\n", res.Task.Id, res.Task.Random, res.Sum)
- }
- }
-
- func main() {
- // 创建task管道,传递task
- taskChan := make(chan *Task, 128)
-
- // 结果管道
- resultChan := make(chan *Result, 128)
-
- // 确保goroutine全部完成
- var wg sync.WaitGroup
-
- wg.Add(1)
- go CreateTask(taskChan, &wg)
-
- // 创建协程池
- CreatePool(133, taskChan, resultChan, &wg)
-
- go func() {
- wg.Wait()
- close(resultChan)
- }()
-
- // 创建协程进行打印
- PrintResult(resultChan)
-
- }

channel一定注意防止被阻塞而导致程序出现死锁!!!

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。sync.Mutex
多个goroutine对同一个共享资源(当前的x)的竞争你,x=x+1,在汇编当中并不是原子性的操作,因此并发时会导致数据不一致,方法1,上锁。
- package main
-
- import (
- "fmt"
- "sync"
- "sync/atomic"
- )
-
- var (
- total int32
- wg sync.WaitGroup
- mutex sync.Mutex
- )
-
- func Add() {
- defer wg.Done()
-
- for i := 0; i < 10000; i++ {
- //原子操作
- atomic.AddInt32(&total, 1)
- //mutex.Lock()
- //total++
- //mutex.Unlock()
- }
-
- }
-
- func Del() {
- defer wg.Done()
-
- for i := 0; i < 10000; i++ {
- atomic.AddInt32(&total, -1)
- //mutex.Lock()
- //total--
- //mutex.Unlock()
- }
-
- }
-
- func main() {
-
- fmt.Println("origin num:", total)
-
- wg.Add(2)
-
- go Add()
- go Del()
-
- wg.Wait()
- fmt.Println("After num:", total)
- }
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- var x int64
- var wg sync.WaitGroup
- var lock sync.Mutex
-
- func Add() {
- for i := 0; i < 50; i++ {
- lock.Lock()
- x = x + 1
- lock.Unlock()
- }
- wg.Done()
- }
-
- func main() {
- wg.Add(2)
- go Add()
- go Add()
- wg.Wait()
- fmt.Println(x)
- }

- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- var (
- x int64
- wg sync.WaitGroup
- lock sync.Mutex
- rwlock sync.RWMutex //读写互斥锁
- )
-
- func Write() {
- //lock.Lock() //加互斥锁
- rwlock.Lock()
- x = x + 1
- time.Sleep(10 * time.Millisecond)
- rwlock.Unlock()
- //lock.Unlock() //解互斥锁
- wg.Done()
- }
-
- func Read() {
- //lock.Lock()
- rwlock.RLock()
- time.Sleep(time.Millisecond)
-
- rwlock.RUnlock()
- //lock.Unlock()
- wg.Done()
- }
-
- func main() {
- start := time.Now()
-
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go Write()
- }
-
- for i := 0; i < 1000; i++ {
- wg.Add(1)
- go Read()
- }
-
- wg.Wait()
- end := time.Now()
- fmt.Println(end.Sub(start))
- }
前面介绍过sync的一些方法

参考:Go sync.Once | Go 语言高性能编程 | 极客兔兔
执行一次的函数,可以在代码任意位置加载,常用于单例模式(懒汉式),并发场景安全。而init是package首次执行时加载(饿汉式)
对外接口:func (o *Once) Do(f func())
这个是并发安全的Map
- package main
-
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
-
- var x int64
- var l sync.Mutex
- var wg sync.WaitGroup
-
- // 普通版加函数
- func add() {
- // x = x + 1
- x++ // 等价于上面的操作
- wg.Done()
- }
-
- // 互斥锁版加函数
- func mutexAdd() {
- l.Lock()
- x++
- l.Unlock()
- wg.Done()
- }
-
- // 原子操作版加函数
- func atomicAdd() {
- atomic.AddInt64(&x, 1)
- wg.Done()
- }
-
- func main() {
- start := time.Now()
- for i := 0; i < 10000; i++ {
- wg.Add(1)
- //go add() // 普通版add函数 不是并发安全的
- //go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
- go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
- }
- wg.Wait()
- end := time.Now()
- fmt.Println(x)
- fmt.Println(end.Sub(start))
- }
context详解:https://www.cnblogs.com/juanmaofeifei/p/14439957.html
Go 语言并发编程与 Context | Go 语言设计与实现
Context是用来用来处理goroutine,可以在多个goroutine中传递取消信号、超时等。
通俗的解释:Context · Go语言中文文档
由于golang的server在goroutine当中,context就是有效管理这些goroutine,相互调用的goroutine之间通过传递context变量保持关联,这样在不用暴露各goroutine内部实现细节的前提下,有效地控制各goroutine的运行。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- var wg sync.WaitGroup
-
- // 退出全局变量
- var stop bool
-
- func worker() {
- defer wg.Done()
- for {
- if stop {
- break
- }
- time.Sleep(time.Second)
- fmt.Println("worker")
- }
- }
-
- func main() {
- wg.Add(1)
- go worker()
-
- time.Sleep(3 * time.Second)
- stop = true
- wg.Wait()
- }
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- var wg sync.WaitGroup
-
- var ch = make(chan struct{})
-
- func worker() {
- defer wg.Done()
- LOOP:
- for {
-
- select {
- case <-ch:
- fmt.Println("exit")
- break LOOP
- default:
- time.Sleep(time.Second)
- fmt.Println("worker")
- }
-
- }
- }
-
- func main() {
- wg.Add(1)
- go worker()
-
- time.Sleep(3 * time.Second)
- ch <- struct{}{}
- wg.Wait()
- }
- package main
-
- import (
- "context"
- "fmt"
- "sync"
- "time"
- )
-
- var wg sync.WaitGroup
-
- func worker(ctx context.Context) {
- defer wg.Done()
- LOOP:
- for {
-
- select {
- case <-ctx.Done():
- fmt.Println("exit")
- break LOOP
- default:
- time.Sleep(time.Second)
- fmt.Println("worker")
- }
-
- }
- }
-
- func main() {
- wg.Add(1)
- ctx, cancel := context.WithCancel(context.Background())
- go worker(ctx)
-
- time.Sleep(3 * time.Second)
- cancel() //等待子routine结束
-
- wg.Wait()
- }
如果函数当中需要被控制、超时、传递时,但不希望改变原来的接口时,函数第一个参数传入ctx。
- type Context interface {
- Deadline() (deadline time.Time, ok bool)
- Done() <-chan struct{}
- Err() error
- Value(key interface{}) interface{}
- }
- package main
-
- import (
- "context"
- "fmt"
- "time"
- )
-
- func main() {
- d := time.Now().Add(50 * time.Millisecond)
- ctx, canel := context.WithDeadline(context.Background(), d)
-
- // 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
- // 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
- defer canel()
-
- select {
- case <-time.After(10 * time.Millisecond):
- fmt.Println("overslept")
- case <-ctx.Done():
- fmt.Println(ctx.Err())
- }
- }
- package main
-
- import (
- "context"
- "fmt"
- "sync"
- "time"
- )
-
- var wg sync.WaitGroup
-
- func worker(ctx context.Context) {
- defer wg.Done()
- LOOP:
- for {
-
- select {
- case <-ctx.Done():
- fmt.Println("exit")
- break LOOP
- default:
- time.Sleep(time.Second)
- fmt.Println("worker")
- }
-
- }
- }
-
- func main() {
- wg.Add(1)
-
- //超时控制
- ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
- go worker(ctx)
-
- time.Sleep(3 * time.Second)
- wg.Wait()
- }
- package main
-
- import (
- "context"
- "fmt"
- "sync"
- "time"
- )
-
- var wg sync.WaitGroup
-
- func worker(ctx context.Context) {
- //拿到key,value
- fmt.Printf("traceid:%s\n", ctx.Value("traceid"))
- //记录一些日志等等,方便排查
-
- defer wg.Done()
- LOOP:
- for {
-
- select {
- case <-ctx.Done():
- fmt.Println("exit")
- break LOOP
- default:
- time.Sleep(time.Second)
- fmt.Println("worker")
- }
-
- }
- }
-
- func main() {
- wg.Add(1)
-
- //超时控制
- ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
-
- //传递一些值,后续可能链路追踪id
- childCtx := context.WithValue(ctx, "traceid", "123456")
-
- go worker(childCtx)
-
- time.Sleep(3 * time.Second)
- wg.Wait()
- }