并发指在同一时间内可以执行多个任务 o 语言通过编译器运行时(runtime),从语言上支持了并发的特性。Go 语言的并发通过
goroutine 特性完成。goroutine 类似于线程,但是可以根据需要创建多个 goroutine 并发工作。goroutine 是由 Go 语言的运行时调度完成,而线程是由操作系统调度完成。Go 语言还提供 channel 在多个 goroutine 间进行通信。goroutine 和 channel 是 Go
语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础
Go语言从语言层面就支持并发。同时实现了自动垃圾回收机制
简单的介绍以下几个概念:
进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
线程是进程的一个执行实体,是 CPU 调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行
多线程程序在单核心的 cpu 上运行,称为并发;多线程程序在多核心的 cpu 上运行,称为并行。
并发与并行并不相同,并发主要由切换时间片来实现“同时”运行,并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核心数,以发挥多核计算机的能力。
协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
线程:一个线程上可以跑多个协程,协程是轻量级的线程
goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务,它是Go语言并发设计的核心。
说到底 goroutine 其实就是线程,但是它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,而且Go语言内部也实现了 goroutine 之间的内存共享。
使用 go 关键字就可以创建 goroutine,将 go 声明放到一个需调用的函数之前,在相同地址空间调用运行这个函数,这样该函数执行时便会作为一个独立的并发线程,这种线程在Go语言中则被称为 goroutine
channel 是Go语言在语言级别提供的 goroutine 间的通信方式。我们可以使用 channel 在两个或多个 goroutine 之间传递消息。
channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,我们建议用分布式系统的方法来解决,比如使用 Socket 或者 HTTP 等通信协议。Go语言对于网络方面也有非常完善的支持。
channel 是类型相关的,也就是说,一个 channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。如果对 Unix 管道有所了解的话,就不难理解 channel,可以将其认为是一种类型安全的管道
定义一个 channel 时,也需要定义发送到 channel 的值的类型,注意,必须使用 make 创建 channel,代码如下所示:
ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{}
goroutine 是 Go语言中的轻量级线程实现,由 Go 运行时(runtime)管理。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
Go 程序从 main 包的 main() 函数开始,在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine
使用go
关键字为一个函数创建一个goroutine。一个函数可以创建多个goroutine,一个goroutine只能对应一个函数。
格式
go 函数名( 参数列表 )
使用go关键字创建goroutine时,被调用函数的返回值会被忽略。如果需要在goroutine中返回数据,则需要根据通道的特性,通过通道把数据从goroutine中作为返回值传出。
例如:
package main
import (
"fmt"
"time"
)
func running() {
var times int
// 构建一个无限循环
for {
times++
fmt.Println("tick", times)
// 延时1秒
time.Sleep(time.Second)
}
}
func main() {
// 并发执行程序
go running()
// 接受命令行输入, 不做任何事情
var input string
scan, err := fmt.Scanln(&input)
if err != nil {
return
}
fmt.Println(scan)
}
代码执行之后,命令行会不断的输出,同时可以使用fmt.Scanln()接受 用户的输入。两个环节可以同时进行。
go关键字也可以为匿名函数或闭包启动goroutine
使用匿名函数或闭包创建 goroutine 时,除了将函数定义部分写在 go 的后面之外,还需要加上匿名函数的调用参数,格式如下:
go func( 参数列表 ){
函数体
}( 调用参数列表 )
其中:
例如:
package main
import (
"fmt"
"time"
)
func main() {
go func() {
var times int
for {
times++
fmt.Println("tick", times)
time.Sleep(time.Second)
}
}()
var input string
fmt.Scanln(&input)
}
所有 goroutine 在 main() 函数结束时会一同结束。
如果两个或者多个 goroutine 在没有相互同步的情况下,访问某个共享的资源,比如同时对该资源进行读写时,就会处于相互竞争的状态,这就是并发中的资源竞争
我们对于同一个资源的读写必须是原子化的,同一时间只能允许一个goroutine对资源进行读写操作。
Go为我们提供了一个工具帮助我们检查,这个就是go build -race
命令。在项目目录下执行这个命令,生成一个可以执行的文件,然后运行这个可执行文件,就可以看到打印出的检测信息。
在 go build
命令中多加了一个 -race
标志,这样生成的可执行程序就自带了检测资源竞争的功能
运行生成的可执行文件,效果如下所示:
==================
WARNING: DATA RACE
Read at 0x000000619cbc by goroutine 8:
main.incCount()
D:/code/src/main.go:25 +0x80
Previous write at 0x000000619cbc by goroutine 7:
main.incCount()
D:/code/src/main.go:28 +0x9f
Goroutine 8 (running) created at:
main.main()
D:/code/src/main.go:17 +0x7e
Goroutine 7 (finished) created at:
main.main()
D:/code/src/main.go:16 +0x66
==================
4
Found 1 data race(s)
通过运行结果可以看出 goroutine 8 在代码 25 行读取共享资源 value := count
,而这时 goroutine 7 在代码 28 行修改共享资源 count = value
,而这两个 goroutine 都是从 main 函数的 16、17 行通过 go 关键字启动的。
Go语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。atomic 和 sync 包里的一些函数就可以对共享的资源进行加锁操作
原子函数
原子函数能够以底层的加锁机制来同步访问整型变量和指针
代码:
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
counter int64
wg sync.WaitGroup //WaitGroup用于等待一组线程的结束
)
func main() {
wg.Add(2) //增加两个线程
go incCounter(1)
go incCounter(2)
wg.Wait() //阻塞 等待goroutine结束 当wg的计数器为0结束
fmt.Println(counter)
}
func incCounter(id int) {
defer wg.Done()
for count := 0; count < 2; count++ {
atomic.AddInt64(&counter, 1) //安全的对counter加1
runtime.Gosched() // 让当前线程暂停
}
}
输出:
4
另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式
下面是代码就使用了 LoadInt64 和 StoreInt64 函数来创建一个同步标志,这个标志可以向程序里多个 goroutine 通知某个特殊状态。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
shutdown int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1 * time.Second)
fmt.Println("Shutdown Now")
atomic.StoreInt64(&shutdown, 1)
wg.Wait()
}
func doWork(name string) {
defer wg.Done()
for {
fmt.Printf("Doing %s Work\n", name)
time.Sleep(250 * time.Millisecond)
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down\n", name)
break
}
}
}
上面代码中 main 函数使用 StoreInt64 函数来安全地修改 shutdown 变量的值。如果哪个 doWork goroutine 试图在 main 函数调用 StoreInt64 的同时调用 LoadInt64 函数,那么原子函数会将这些调用互相同步,保证这些操作都是安全的,不会进入竞争状态
互斥锁
另一种同步访问共享资源的方式是使用互斥锁,互斥锁这个名字来自互斥的概念。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界代码
var mutex sync.Mutex//声明一个互斥锁
//同一时刻只允许一个goroutine进入这个临界区
mutex.Lock()
{
value := counter
runtime.Gosched()
value++
counter = value
}
mutex.Unlock() //释放锁,允许其他正在等待的goroutine进入临界区
在 Go语言程序运行时(runtime)实现了一个小型的任务调度器。这套调度器的工作原理类似于操作系统调度线程,Go 程序调度器可以高效地将 CPU 资源分配给每一个任务。传统逻辑中,开发者需要维护线程池中线程与 CPU 核心数量的对应关系。同样的,Go 中也可以通过 runtime.GOMAXPROCS() 函数做到,格式为:
runtime.GOMAXPROCS(逻辑CPU数量)
这里的逻辑CPU数量可以有如下几种数值:
1:多核并发执行。
一般情况下,可以使用 runtime.NumCPU() 查询 CPU 数量,并使用 runtime.GOMAXPROCS() 函数进行设置,例如:
runtime.GOMAXPROCS(runtime.NumCPU()
GOMAXPROCS 同时也是一个环境变量,在应用程序启动前设置环境变量也可以起到相同的作用
他们都可以将函数或者语句在独立的环境中运行,但是他们之间有两点不同:
goroutines 意味着并行(或者可以以并行的方式部署),coroutines 一般来说不是这样的,goroutines 通过通道来通信;coroutines 通过让出和恢复操作来通信,goroutines 比 coroutines 更强大,也很容易从 coroutines 的逻辑复用到 goroutines。
狭义地说,goroutine 可能发生在多线程环境下,goroutine 无法控制自己获取高优先度支持;coroutine 始终发生在单线程,coroutine 程序需要主动交出控制权,宿主才能获得控制权并将控制权交给其他 coroutine。
goroutine 间使用 channel 通信,coroutine 使用 yield 和 resume 操作。
coroutine 的运行机制属于协作式任务处理,早期的操作系统要求每一个应用必须遵守操作系统的任务处理规则,应用程序在不需要使用 CPU 时,会主动交出 CPU 使用权。如果开发者无意间或者故意让应用程序长时间占用 CPU,操作系统也无能为力,表现出来的效果就是计算机很容易失去响应或者死机。
goroutine 属于抢占式任务处理,已经和现有的多线程和多进程任务处理非常类似。应用程序对 CPU 的控制最终还需要由操作系统来管理,操作系统如果发现一个应用程序长时间大量地占用 CPU,那么用户有权终止这个任务。
channels是gotoutine之间的通信机制。
一个 channels 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。
Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
这里通信的方法就是使用通道(channel)
chan是一种队列一样的结构,目的就是提高效率。
通道是一种特殊的类型,在任何时候,同时只能有一个goroutine访问通道进行发送和获取数据。
通道像一个队列,遵循FIFO先进先出的规则,保证收发数据的顺序
通道本身需要一个类型进行修饰,就像切片类型需要标示元素类型就是在其内部传输的数据类型,声明如下:
var 通道变量 chan 通道类型
chan 类型的空值是 nil,声明后需要配合 make 后才能使用
通道是引用类型,需要使用 make 进行创建,格式如下:
通道实例 := make(chan 数据类型)
例子:
ch1 := make(chan int) // 创建一个整型类型的通道
ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
通道的发送使用特殊的操作符 <-
,将数据通过通道发送的格式为:
通道变量 <- 值
例如:
使用 make 创建一个通道后,就可以使用 <-
向通道发送数据,代码如下:
// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"
通道接收同样使用<-
操作符,接收有如下特性:
一般有以下几种写法
阻塞模式接收数据时,将接收变量作为 <-
操作符的左值,格式如下:
data := <-ch
执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。
2. 非阻塞接收数据
使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:
data, ok := <-ch
非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,可以参见后面的内容。
3. 接收任意数据,忽略接收的数据
阻塞接收数据后,忽略从通道返回的数据,格式如下:
<-ch
执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
使用通道做并发同步的写法,可以参考下面的例子:
package main
import (
"fmt"
)
func main() {
// 构建一个通道
ch := make(chan int)
// 开启一个并发匿名函数
go func() {
fmt.Println("start goroutine")
// 通过通道通知main的goroutine
ch <- 0
fmt.Println("exit goroutine")
}()
fmt.Println("wait goroutine")
// 阻塞 等待匿名goroutine
<-ch
fmt.Println("all done")
}
执行代码,输出如下:wait goroutine
start goroutine
exit goroutine
all done
代码说明如下:
通道的数据接收可以借用 for range 语句进行多个元素的接收操作,格式如下:
for data := range ch {
}
通道 ch 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过 for 遍历获得的变量只有一个,即上面例子中的 data。
遍历通道数据的例子请参考下面的代码。
使用 for 从通道中接收数据:
package main
import (
"fmt"
"time"
)
func main() {
// 构建一个通道
ch := make(chan int)
// 开启一个并发匿名函数
go func() {
// 从3循环到0
for i := 3; i >= 0; i-- {
// 发送3到0之间的数值
ch <- i
// 每次发送完时等待
time.Sleep(time.Second)
}
}()
// 遍历接收通道数据
for data := range ch {
// 打印通道数据
fmt.Println(data)
// 当遇到数据0时, 退出接收循环
if data == 0 {
break
}
}
}
执行代码,输出如下:3
2
1
0
代码说明如下:
单向通道只能用来写入或者只能用于读取数据。
所谓的单项通道其实是一channel的一种使用限制。 如果只能读,则通道一定是空的,因为没有办法写。如果是只能写,也没有意义,因为无法取到里面的数据。
我们在将一个 channel 变量传递到一个函数时,可以通过将其指定为单向 channel 变量,从而限制该函数中可以对此 channel 的操作,比如只能往这个 channel 中写入数据,或者只能从这个 channel 读取数据。
单向 channel 变量的声明非常简单,只能写入数据的通道类型为 chan<-
,只能读取数据的通道类型为 <-chan
,格式如下:
var 通道实例 chan<- 元素类型 // 只能写入数据的通道
var 通道实例 <-chan 元素类型 // 只能读取数据的通道
例如:
代码如下:
ch := make(chan int)
// 声明一个只能写入数据的通道类型, 并赋值为ch
var chSendOnly chan<- int = ch
//声明一个只能读取数据的通道类型, 并赋值为ch
var chRecvOnly <-chan int = ch
上面的例子中,chSendOnly 只能写入数据,如果尝试读取数据,将会出现如下报错:invalid operation: <-chSendOnly (receive from send-only type chan<- int)
同理,chRecvOnly 也是不能写入数据的。
当然,使用 make 创建通道时,也可以创建一个只写入或只读取的通道:
ch := make(<-chan int)
var chReadOnly <-chan int = ch
<-chReadOnly
上面代码编译正常,运行也是正确的。但是,一个不能写入数据只能读取的通道是毫无意义的。
close(ch)
判断通道是否关闭
x, ok := <-ch
OK为false 则已经关闭
Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
阻塞指的是由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足才解除阻塞。
同步指的是在两个或多个协程(线程)之间,保持数据内容一致性的机制
是一种在被接受前能存储一个或者多个值的通道。这种类型的通道并不强制要求goroutine之间必须同时完成发送和接受。
通道实例 := make(chan 通道类型, 缓冲大小)
例如:
// 创建一个3个元素缓冲大小的整型通道
ch := make(chan int, 3)
无缓冲通道可以看作是长度永远是0的带缓冲通道。因为根据这个特性,带缓冲通道在下面的这中情况下依然会发生阻塞:
** 为什么Go不提供无限长度的通道**
通道(channel)是在两个 goroutine 间通信的桥梁。使用 goroutine 的代码必然有一方提供数据,一方消费数据。当提供数据一方的数据供给速度大于消费方的数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃。因此,限制通道的长度有利于约束数据提供方的供给速度,供给数据量必须在消费方处理量+通道长度的范围内,才能正常地处理数据
使用select来设置超时。
select 机制不是专门为超时而设计的,却能很方便的解决超时问题,因为 select 的特点是只要其中有一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况
select 的用法与 switch 语言非常类似,由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。
与 switch 语句相比,select 有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作,大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。
如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有如下两种可能的情况:
表示在一个信道上传输多路信号或数据流的过程和技术
在使用通道时,想要同时接受多个通道的数据是一件困难的事情。通道在接收数据时,如果没有数据可以接收将会发生阻塞。虽然可以使用如下模式进行遍历,但运行性能会非常差。
for{
//尝试接受ch1通道
data,ok := <-ch1
//尝试接收ch2通道
data,ok := <-ch2
//接收后续通道
。。。。。
}
可以使用select来实现多路复用
操作 | 语句示例 |
---|---|
接受任意变量 | case<-ch: |
接收变量 | case d:=<-ch : |
发送数据 | case ch<-100 : |
Go语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。
Mutex 是最简单的一种锁类型,同时也比较暴力,当一个 goroutine 获得了 Mutex 后,其他 goroutine 就只能乖乖等到这个 goroutine 释放该 Mutex。
RWMutex 相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个 goroutine 可同时获取读锁(调用 RLock() 方法;而写锁(调用 Lock() 方法)会阻止任何其他 goroutine(无论读和写)进来,整个锁相当于由该 goroutine 独占。从 RWMutex 的实现看,RWMutex 类型其实组合了Mutex:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
对于这两种锁类型,任何一个 Lock() 或 RLock() 均需要保证对应有 Unlock() 或 RUnlock() 调用与之对应,否则可能导致等待该锁的所有 goroutine 处于饥饿状态,甚至可能导致死锁。锁的典型使用模式如下:
package main
import (
"fmt"
"sync"
)
var (
// 逻辑中使用的某个变量
count int
// 与变量对应的使用互斥锁
countGuard sync.Mutex
)
func GetCount() int {
// 锁定
countGuard.Lock()
// 在函数退出时解除锁定
defer countGuard.Unlock()
return count
}
func SetCount(c int) {
countGuard.Lock()
count = c
countGuard.Unlock()
}
func main() {
// 可以进行并发安全的设置
SetCount(1)
// 可以进行并发安全的获取
fmt.Println(GetCount())
}
代码说明如下:
在读多写少的环境中,可以优先使用读写互斥锁(sync.RWMutex),它比互斥锁更加高效。sync 包中的 RWMutex 提供了读写互斥锁的封装。
我们将互斥锁例子中的一部分代码修改为读写互斥锁,参见下面代码:
var (
// 逻辑中使用的某个变量
count int
// 与变量对应的使用互斥锁
countGuard sync.RWMutex
)
func GetCount() int {
// 锁定
countGuard.RLock()
// 在函数退出时解除锁定
defer countGuard.RUnlock()
return count
}
代码说明如下:
Go语言中除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务
在 sync.WaitGroup(等待组)类型中,每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。
等待组有下面几个方法可用,如下表所示。
等待组的方法
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 等待组的计数器 +1 |
(wg * WaitGroup) Done() | 等待组的计数器 -1 |
(wg * WaitGroup) Wait() | 当等待组计数器不等于 0 时阻塞直到变 0。 |
对于一个可寻址的 sync.WaitGroup 值 wg:
等待组内部拥有一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。