Go 提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
channel收/发操作都遵循了先进先出的设计,它一共使用了3个队列来实现:
hchan.sendq
)
hchan.recvq
)
并发控制可由2种方式实现:
乐观锁并没有锁这个变量,而是对原数据进行比较,所以乐观锁只是一种思想。无锁channel是使用了乐观锁思想实现的。
runtime.hchan
结构体
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
队列中存的结构是runtime.sudog
:
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
hchan.buff
指向一个数组地址,能存放数据,尽量避免了所有协程有阻塞
hchan.qcount,将数据放入缓冲区,hchan.sendx
指向下一个数组下标,唤醒读队列hchan.recvq
头部的协程。当前协程不阻塞,继续向下执行代码。
hchan.qcount>=hchan.dataqsiz
,当前goroutine阻塞(被挂起_GWating
),新创建一个sudog
,sudog.g
指向当前goroutine,sudog
变量塞进写队列hchan.sendq
hchan.recvq
和hchan.recvx
hchan.buff
是个nil值,没有数据存储的区域,肯定会出现阻塞现象
hchan.recvq
队列中去获取一个正阻塞的协程sudog
结构变量
hchan.recvq
有数据,则根据sudog.g
变量去唤醒协程,并向这个协程发送数据hchan.recvq
有数据,则创建一个sudog
结构体变量,sudog.g
变量指向当前协程,放到hchan.sendq
队列,当前goroutine阻塞(被挂起_GWating
)hchan.recvq
唤醒阻塞协程对channel做操作,都是由当前协程通知g0协程做调度
问题:为什么多个case被阻塞,说明当前g被加到了多个hchan.recvq
或者hchan.sendq
中,为什么只会执行一个case。
func SendBlock2() {
c1 := make(chan int)
c2 := make(chan int)
// go不能放select后,因为执行顺序的问题,如果放后面在select就挂起了协程,导致没有创建这个协程,也就不可能唤醒当前协程,从而导致死锁
go func() {
time.Sleep(3 * time.Second)
a := <-c1
fmt.Println(a)
}()
select {
case c1 <- 2:
fmt.Println("case1")
case c2 <- 3:
fmt.Println("case2")
}
}
上例的现象是:打印了case1或者case2,并不会两个都打印。
执行步骤:
hchan.sendq
中,c1和c2解锁允许其他协程操作这个channel,g被挂起等待recvq
和sendq
遍历删除绑定了当前协程的sudog
,因为删除了队列中的等待g,所以g不会被重新唤醒,case2就再也不命中。// 方法1,没有分配地址,无法读写chan
var c chan Type
// 方法2,分配了地址,设置了size就是有缓冲channel,反之是无缓冲地址
c := make(chan Type [, size])
c := make(chan Type [, size])
// 方法1
c <- val
// 方法2
select {
case c<-2:
// 下一个
case c<-2:
//业务逻辑
default:
//可以避免阻塞
}
c := make(chan Type [, size])
// 方法1
t := <-c
// 方法2
t,ok := <-c // ok==false表明,chan被关闭
// 方法3
select {
case <-c:
//业务逻辑
default:
//可以避免阻塞
}
// 方法4
for element := range c {
fmt.Println("chan element:", element)
}
close(c)
func SendBlock1() {
// 创建缓冲区容量是3的通道
c := make(chan int, 3)
defer close(c)
// 创建4个协程往通道里写,会有一个协程阻塞等待
for i := 0; i < 4; i++ {
go func(i int) {
c <- i
fmt.Printf("i=%d成功插入chan\n", i)
}(i) // 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
}
time.Sleep(3 * time.Second)
//打印,2协程阻塞等待
//i=3成功插入chan
//i=0成功插入chan
//i=1成功插入chan
}
func SendBlock() {
c := make(chan int)
defer close(c)
for i := 0; i < 4; i++ {
// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
go func(i int) {
c <- i
fmt.Printf("i=%d成功插入chan\n", i)
}(i)
}
time.Sleep(3 * time.Second)
//没有任何打印,因为hchan.recvq没有协程可以唤醒
}
死锁:
func f1(channel chan int) {
time.Sleep(6 * time.Second)
channel <- 20
//close(channel)
}
func main() {
channel := make(chan int)
//例1 不会死锁,因为读写都只进行了一次之后就结算了
go func() {
time.Sleep(6 * time.Second)
channel <- 20
}()
fmt.Println(<-channel) // 主协程会阻塞等待管道进入数据
//例2 不会死锁,因为读写都只进行了一次之后就结算了
go f1(channel)
fmt.Println(<-channel)
//例3 不会死锁,因为读写都只进行了一次之后就结算了
go func(channel chan int) {
channel <- 20
}(channel)
fmt.Println(<-channel)
//例4 会死锁,主协程会一直等待子进程写入,无法退出,此时需要在子协程加入close(channel),表明自己不会在对协程做操作了
go func(channel chan int) {
channel <- 20
// close(channel) 加上则不会死锁
}(channel)
for element := range channel {
fmt.Println(element)
}
//例子5,结果是等待3秒后,其中一个消费协程会被死锁,因为他一直在等待channel的数据进入
channel := make(chan int, 3)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
//fmt.Println("子协程1")
fmt.Println("子协程1抢到的" + strconv.Itoa(<-channel))
}()
go func() {
defer wg.Done()
//fmt.Println("子协程2")
fmt.Println("子协程2抢到的" + strconv.Itoa(<-channel))
}()
go func() {
defer wg.Done()
channel <- 20
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
}
}()
//channel <- 21
wg.Wait()
}