• go channel实践与源码探索(初始化、发送消息、接收消息、关闭)


    概要

    通道(Channel)是Go语言提供的协程之间通信与同步的方式。我们知道在并发编程(多进程、多线程)中,进程or线程间的通信和同步是基础,不然就无法解决五位哲学家就餐之类的问题,但是传统的通信同步方式太过复杂,需要开发者了解互斥锁、临界区管理、条件变量控制等,这些东西在Go语言中Channel都帮你做了(屏蔽了),简而言之,你可以放心大胆的用channel去进行不同Goroutine之间的通信与同步,来完成对Goroutine的竞争关系,协作关系的实现。

    Go语言是遵循CSP编程模型的,其核心是通过通道来实现通信与同步。在Go语言中借助其调度器,可以高效的实现通道的阻塞与唤醒,进而实现通道的多路复用(Select 操作)。因此Go的并发编程强调不要利用共享内存来进行Goroutine通讯,而应该依靠通讯来共享数据(Do not communicate by sharing memory; instead, share memory by communicating),尽量避免锁和协程争用。

    CSP模型示意图

    PS:go V1.20.6

    一、并发编程

    在聊Go channel之前,这里先问一句,Go Channel是用来干什么的?其实在摘要中已经说了,处理并发下多个Goroutine之间的竞争与协作关系,进程线程在并发中同样面临该问题。
    进程间通信在学习操作系统一书中就了解到,同一服务器内的进程之间主要有五种:信号、管道、消息传递、信号量、共享内存(当然了,不同服务器进程之间通过网络,比如TCP,本文只聊同一服务器)。
    线程之间主要是共享内存+锁、消息传递等。

    消息传递和共享内存模式是使用比较广泛的,其中消息传递在实践中主要分为Actor和CSP两种模型,Actor模型重点在于参与交流的实体,而CSP模型重点在于交流的通道。

    1.1、Actor模型

    Actor模型示意图
    Actor模型首先是由Carl Hewitt在1973年提出定义, 随后由Erlang OTP (Open Telecom Platform) 推广开来。Actor属于并发组件模型, 通过组件方式定义并发编程范式的高级原语,避免使开发者直接接触多线程并发或线程池等基础概念,其消息传递更加符合面向对象的原始意图。
    传统多数流行的语言并发是基于多线程之间的共享内存,使用同步机制(互斥锁)来防止写竞争。而在Actor消息模型中,每个Actor在任何时间都是串行执行的,即同一时间处理最多一个消息,最多可以发送一个消息给其他Actor,保证了单独写原则,从而巧妙避免了多线程的写竞争。

    在Actor模型中,主角是Actor,类似Go语言中的goroutine,Actor彼此之间可以直接发送消息,不需要经过中介,消息是异步发送和处理的。
    每个 Actor 可以认为是一个基本的计算单元,它能接收消息并基于其进行运算,也可以发送消息给其他 Actor。Actor之间相互隔离,它们之间并不共享内存。

    Actor 是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成:

    • 状态:Actor 中的状态指 Actor 对象的变量信息,状态由 Actor 自己管理,避免了并发环境下的锁和内存原子性等问题;
    • 行为:Actor 中的计算逻辑,通过 Actor 接收到的消息来改变 Actor 的状态;
    • 邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO消息队列来存储发送方 Actor 消息,接受方 Actor 从邮箱队列中获取消息。

    除了Erlang语言,目前akka库是比较流行的Actor并发编程模型的实现,支持Scala和Java语言

    1.2、CSP模型

    CSP模型示意图
    CSP(Communicating Sequential Processes)是由Tony Hoare在1978的论文上首次提出的。 它是处理并发编程的一种设计模式或者模型,指导并发程序的设计,提供了一种并发程序可实践的组织方法或者设计范式。通过此方法,可以减少并发程序引入的其它缺点,减少和规避并发程序的常见缺点和bug,并且可以被数学理论所论证。

    CSP将程序分成两种模块,Processor 与 命名Channel:

    • Processor 代表了执行任务的顺序单元,它们内部没有并发;
    • Channel代表了并发流之间的信息交互,如共享数据的交换、修改、消息传递,状态同步等等。

    除了Channel,Processor之间再无联系,这样就将并发同步作用转移至Channel,使得问题得到了约束、集中。同步操作与竞争操作并没有消失,只是聚焦于Channel之上。Processor之间的协作,由Channel提供并发原语支持,最初Channel是无缓冲的,因此发送操作会阻塞,直到接收端接收后才能继续发送,从而提供了一种同步机制。

    在CSP模型中,Processor 之间不直接彼此联系,而是通过不同Channel进行消息发布和同步。消息的发送者和接收者之间通过Channel松耦合,发送者不知道自己消息被哪个接收者消费了,接收者也不知道是哪个发送者发送的消息。所以CSP 的好处是使得系统较为清晰,Processor 之间是解耦合的,职责也非常清楚,容易理解和维护。

    Go语言的并发编程模型参考了 CSP 理论,其中执行单元Processor对应的是Goroutine, 消息通道Channel对应的就是channel,一起通过后续章节,结合通道(Channel)的使用和原理来体会CSP思想的精髓

    二、Go Channel实践

    1. 初始化
    var cname chan int //声明一个chan int 类型的变量,变量名称叫name,默认值为nil
    bname := make(chan int)//make函数初始化channel,无缓冲通道
    cname = make(chan int,2)//make函数初始化channel,有缓冲通道,缓冲大小为2
    
    • 1
    • 2
    • 3

    注意:一个只声明但未初始化的chan 变量不能进行读,写,关闭、for遍历操作,否则会panic

    1. 操作
    cname<-1 //写入,如果没有缓冲区或缓冲区已满会阻塞
    v,ok:=<-cname //读取,如果没有可读数据会阻塞
    //或者 v:=<-cname
    close(cname)//关闭通道,如果重复关闭会panic
    for c:=range cname { //遍历,如果通道没有关闭,在遍历完既有数据后,会一直阻塞
      fmt.Println("v:%d,v addr:%p\n", v, &v)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    for…range是一个语法糖,在遍历通道时,其本质如下:

    for{
        v,ok := <-ch
        if!ok{
            break
        }
        original body
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 多路复用select
      一个select管理多个channel(类似linux网路操作select,poll,epoll,可以同时管理多个socket)
    var dname chan int
    select {
    case <-cname:
    	fmt.Println("rand one")
    case c:<-cname:
    	fmt.Println("rand two:", c)	
    case <-dname: //通过select 读取未声明的chan 是不会panic的
    	fmt.Println("read nil chan")
    case dname<-1: //通过select 写入未声明的chan 是不会panic的,其本质就是占据一次select机会,并不会真的写入数据
    	fmt.Println("write nil chan")	
    default:
    	fmt.Println("rand one")//其他分支都被阻塞时才会被执行
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    很多时候我们并不希望select执行一个分支,就退出,这时可以去for + select

    cname<-1
    for {
    	select {//注意,每一次select,如果多个分支的channel都是可读或写的,那么选择哪一个分支完全是随机的
    	case <-cname:
    		fmt.Println("rand one")
    	case <-cname:
    		fmt.Println("rand two")	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    三、源码分析

    go channel的结构体如下:

    type hchan struct {
    	qcount   uint     //  缓冲区中的元素个数
    	dataqsiz uint     // 缓冲区大小
    	buf      unsafe.Pointer // 缓冲区,一个循环队列(数组)
    	sendx    uint   // 队尾,记录发送者在 buf 中索引位置
    	recvx    uint   // 队头,记录接收者在 buf 中索引位置
    	//通道元素相关
    	elemsize uint16  //元素大小
    	elemtype *_type // 元素类型
    	//
    	closed   uint32 //通道关闭标记
    	recvq    waitq  // 阻塞的接收者队列 list of recv waiters
    	sendq    waitq  // 阻塞的发送者队列 list of send waiters
    	lock mutex //互斥锁,并发保护。也协助实现channel的同步作用,即当缓冲区耗尽时阻塞接收者或发送者
    }
    
    type waitq struct {
    	first *sudog   //记录双向链表的头部
    	last  *sudog   //记录双向链表的尾部
    }
    //其中sudog表示一个在等待链表中goroutine,比如channel中的recvq和sendq。sudog与goroutine是多对1的,如下:
    // sudog is necessary because the g ↔ synchronization object relation
    // is many-to-many. A g can be on many wait lists, so there may be
    // many sudogs for one g; and many gs may be waiting on the same
    // synchronization object, so there may be many sudogs for one object.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    根据Go channel结构体可得其组成示意图如下:
    go channel结构示意图

    ... 代表代码省略

    3.1、初始化
    func makechan(t *chantype, size int) *hchan {
    	elem := t.elem
    	// compiler checks this but be safe.
    	if elem.size >= 1<<16 {
    		throw("makechan: invalid channel element type")
    	}
    	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    		throw("makechan: bad alignment")
    	}
    	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    		panic(plainError("makechan: size out of range"))
    	}
    	var c *hchan
    	switch {
    	case mem == 0: //当缓冲区设置为0
    		// Queue or element size is zero.
    		c = (*hchan)(mallocgc(hchanSize, nil, true))
    		// Race detector uses this location for synchronization.
    		c.buf = c.raceaddr()
    	case elem.ptrdata == 0: //当通道元素非指针
    		// Elements do not contain pointers.
    		// Allocate hchan and buf in one call.
    		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    		c.buf = add(unsafe.Pointer(c), hchanSize)
    	default: //当通道元素是指针
    		// Elements contain pointers.
    		c = new(hchan)
    		c.buf = mallocgc(mem, elem, true)
    	}
    	c.elemsize = uint16(elem.size)
    	c.elemtype = elem
    	c.dataqsiz = uint(size)
    	lockInit(&c.lock, lockRankHchan) //初始化互斥锁
      ...
    	return c
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    1. 当缓冲区设置为0时,只需申请hchan结构体大小的内存即可;
    2. 当通道元素非指针时,需要申请hchanSize+mem大小的连续内存,前hchanSize大小用来存储hchan结构体,剩余的用来表示缓冲区;
    3. 当通道元素是指针时,hchan结构体和缓冲区的内存需要分开申请,因为当元素包含指针时,需要单独申请独立内存才能进行正常的垃圾回收。
    3.2、发送消息
    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    	return chansend(c, elem, false, getcallerpc()) //select 方法发送channel消息是非阻塞的
    }
    func chansend1(c *hchan, elem unsafe.Pointer) {
    	chansend(c, elem, true, getcallerpc()) //正常的发送channel消息走阻塞逻辑,即没有缓冲区可用时就阻塞当前goroutine
    }
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	if c == nil {
    		if !block {
    			return false//走非阻塞模式,向nil chan发消息,返回false表示不可写
    		}
    		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)//走阻塞模式发消息,就会panic,提示【chan send (nil chan)】
    		throw("unreachable")
    	}
    
    	if !block && c.closed == 0 && full(c) {
    		return false //走非阻塞模式,向已关闭或缓冲区耗尽的通道发消息,返回false表示不可写
    	}
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    	lock(&c.lock)    //上锁
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("send on closed channel"))//走阻塞模式,向已关闭的通道发消息,就会panic
    	}
      //下面就是核心了
    	if sg := c.recvq.dequeue(); sg != nil {//如果接收者队列中有等待读取的goroutine,直接将消息复制给队头协程,并唤醒它
    		// Found a waiting receiver. We pass the value we want to send
    		// directly to the receiver, bypassing the channel buffer (if any).
    		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true
    	}
      //如果缓冲区未被耗尽,则将消息放到缓冲区队尾
    	if c.qcount < c.dataqsiz { 
    		// Space is available in the channel buffer. Enqueue the element to send.
    		qp := chanbuf(c, c.sendx)
    		if raceenabled {
    			racenotify(c, c.sendx, nil)
    		}
    		typedmemmove(c.elemtype, qp, ep)
    		c.sendx++
    		if c.sendx == c.dataqsiz {
    			c.sendx = 0
    		}
    		c.qcount++
    		unlock(&c.lock)
    		return true
    	}
    
    	if !block {//走非阻塞模式,返回false表示不可写
    		unlock(&c.lock)
    		return false
    	}
      //如果缓冲区被耗尽,走同步模式,则将当前goroutine放入sudog结构中,并放入发送者队列末尾,最终进入休眠状态等待被唤醒
    	// Block on the channel. Some receiver will complete our operation for us.
    	gp := getg()//获取当前协程
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	mysg.g = gp    //将当前goroutine放入sudog结构中
    	mysg.isSelect = false
    	mysg.c = c
    	gp.waiting = mysg
    	gp.param = nil
    	c.sendq.enqueue(mysg)//将sudog放入发送者队列末尾
    	gp.parkingOnChan.Store(true)
    	//进行协程切换,去执行别的goroutine,当前goroutine进入休眠状态
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
       //等待被唤醒时继续执行后面的代码
    	// Ensure the value being sent is kept alive until the
    	// receiver copies it out. The sudog has a pointer to the
    	// stack object, but sudogs aren't considered as roots of the
    	// stack tracer.
    	KeepAlive(ep)
    
    	// someone woke us up.
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	closed := !mysg.success//channel被关闭后该字段值会被设为false
    	gp.param = nil
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	mysg.c = nil
    	releaseSudog(mysg)
    	if closed { 
    		if c.closed == 0 {
    			throw("chansend: spurious wakeup")
    		}
    		panic(plainError("send on closed channel"))//在睡眠中,该channel被关闭了,再被唤醒,也会panic,一般不会触发,因为close是会先唤醒等待的读写队列后才会设置closed
    	}
    	return true
    }
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    goready(gp, skip+1) //将sg中的goroutine放到协程的运行队列中,优先放到当前P的本地运行队列,满了在放到全局运行队列,详情见Go GMP
    ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109

    对源码分析后,可知发送消息分为阻塞模式和非阻塞模式,我们最常用的是阻塞模式,核心逻辑如下:

    1. 当接收者队列中有等待读取的goroutine时,直接将消息复制给队头协程,并唤醒它。注意这里的唤醒并不是直接切换到队头协程,而是将其放到运行队列中,等待被调度;
    2. 不满足第一步,且当缓冲区未耗尽时,则将消息写入缓冲区队尾;
    3. 不满足第二步,且当缓冲区耗尽时,则将当前goroutine放入sudog结构中,并放入发送者队列sendq末尾,进行协程切换,令当前goroutine进入休眠状态等待被唤醒。
    3.3、接收消息
    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    	return chanrecv(c, elem, false) //select 方法接收channel消息是非阻塞的
    }
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
    	chanrecv(c, elem, true)//正常的接收channel消息走阻塞逻辑,即缓冲区没有消息时就阻塞当前goroutine
    }
    //参数解析
    //c  hchan结构体
    //ep 接收消息的指针,v <- c,那么ep就是v的指针
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    	if c == nil {
    		if !block {//走非阻塞模式,向nil chan发消息,返回false表示不可读
    			return
    		}
    		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)//走阻塞模式发消息,就会panic,提示【chan receive (nil chan)】
    		throw("unreachable")
    	}
    	// Fast path: check for failed non-blocking operation without acquiring the lock.
    	if !block && empty(c) {//非阻塞模式且缓冲区大小为0
    		if atomic.Load(&c.closed) == 0 {//已关闭的通道直接返回
    			return
    		}
    		// The channel is irreversibly closed. Re-check whether the channel has any pending data
    		// to receive, which could have arrived between the empty and closed checks above.
    		// Sequential consistency is also required here, when racing with such a send.
    		if empty(c) {
    			// The channel is irreversibly closed and empty.
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    	}
    
    
    	lock(&c.lock)//上锁
    	if c.closed != 0 {
    		if c.qcount == 0 {
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			unlock(&c.lock)
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false //已关闭且缓冲区没有消息的直接返回,所以一个已关闭的channel也是可以读取消息的,但是已关闭的channel是无法写入消息的。
    		}
    		// The channel has been closed, but the channel's buffer have data.
    	} else {
    		// Just found waiting sender with not closed.
    		if sg := c.sendq.dequeue(); sg != nil {  //如果发送者队列中有等待写入的goroutine,获取发送者队列中的队头goroutine,直接将其写入的消息复制给当前协程,并唤醒它
    			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    			return true, true
    		}
    	}
    
    	if c.qcount > 0 {//如果缓冲区有消息,则直接获取缓冲区队头的消息,并复制到当前的读取协程中
    		// Receive directly from queue
    		qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    		}
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)//将缓冲区的消息内容复制给承接消息内容的指针
    		}
    		typedmemclr(c.elemtype, qp)
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		c.qcount--
    		unlock(&c.lock)
    		return true, true
    	}
    
    	if !block {
    		unlock(&c.lock)
    		return false, false
    	}
      //如果缓冲区没有消息,走同步模式,则将当前goroutine放入sudog结构中,并放入接收者队列末尾,最终进入休眠状态等待被唤醒
    	// no sender available: block on this channel.
    	gp := getg()//获取当前goroutine
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	gp.waiting = mysg
    	mysg.g = gp //将当前goroutine放入sudog结构中
    	mysg.isSelect = false
    	mysg.c = c
    	gp.param = nil
    	c.recvq.enqueue(mysg)//将sudog放入接收者队尾
    	// Signal to anyone trying to shrink our stack that we're about
    	// to park on a channel. The window between when this G's status
    	// changes and when we set gp.activeStackChans is not safe for
    	// stack shrinking.
    	gp.parkingOnChan.Store(true)
    	//进行协程切换,去执行别的goroutine,当前goroutine进入休眠状态
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
      //当协程被唤醒,就继续执行后面的代码
    	// someone woke us up
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	success := mysg.success
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true, success
    }
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    	...
    	goready(gp, skip+1)//将sg中的goroutine放到协程的运行队列中,优先放到当前P的本地运行队列,满了在放到全局运行队列,详情见Go GMP
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    对源码分析后,可知接收消息分为阻塞模式和非阻塞模式,我们最常用的是阻塞模式,核心逻辑如下:

    1. 当发送者队列有等待写入的goroutine时,获取发送者队列中的队头goroutine,直接将其写入的消息复制给当前协程,并唤醒它。注意这里的唤醒并不是直接切换到队头goroutine,而是将其放到运行队列中,等待被调度,这样就不会影响当前协程的运行;
    2. 不满足第一步,且当缓冲区有消息时,则将缓冲区队头消息复制给当前协程;
    3. 不满足第二步,且当缓冲区满或缓冲区大小为0时,则将当前goroutine放入sudog结构中,并放入接收者队列recvq末尾,进行协程切换,令当前goroutine进入休眠状态等待被唤醒。
    3.4、关闭通道
    func closechan(c *hchan) {
    	if c == nil {//关闭未初始化的通道会panic
    		panic(plainError("close of nil channel"))
    	}
    	lock(&c.lock)//上锁
    	if c.closed != 0 {//重复关闭通道会panic
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    	...
    	c.closed = 1 //设置closed标志位为1
    	var glist gList
    	// release all readers
    	for {
    		sg := c.recvq.dequeue()
    		if sg == nil {
    			break
    		}
    		...
    		gp := sg.g
    		...
    		glist.push(gp)//收集所有接收者队列中被阻塞的goroutine
    	}
    
    	// release all writers (they will panic)
    	for {
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    		...
    		gp := sg.g
    		...
    		glist.push(gp)//收集所有发送者队列中被阻塞的goroutine
    	}
    	unlock(&c.lock)
    
    	// Ready all Gs now that we've dropped the channel lock.
    	for !glist.empty() {
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3) //统一唤醒接收者队列和发送者队列中被阻塞的goroutine
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    总结

    经过第二、三章节的阅读几乎涵盖Go channel的所有知识点了,这里再多说一下chan是否必须close,其实是没必要的,当没有goroutine持有这个chan时其就会被gc回收,并不会泄漏。close最大的作用是唤醒所有接收者队列和发送者队列中被阻塞的goroutine。

  • 相关阅读:
    【细读Spring Boot源码】重中之重refresh()
    java全内容收集
    企业宣传为何要重视领军人物包装?领军人物对企业营销的价值和作用分析
    Swift - 泛型
    Win11怎么修改关机界面颜色?Win11修改关机界面颜色的方法
    数据库基础与MySQL入门
    【课上笔记】第八章 图
    深入了解快速排序:原理、性能分析与 Java 实现
    数据文件采用错误方式删除后的解决办法
    DbSchema导出HTML/PDF版表结构
  • 原文地址:https://blog.csdn.net/weixin_38597669/article/details/132647180