• Golang-channel合集——源码阅读、工作流程、实现原理、已关闭channel收发操作、优雅的关闭等面试常见问题。


    前言

    面试被问到好几次“channel是如何实现的”,我只会说“啊,就一块内存空间传递数据呗”…所以这篇文章来深入学习一下Channel相关。从源码开始学习其组成、工作流程及一些常见考点。

    NO!共享内存

    Golang的并发哲学是“要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。”

    共享内存会需要使用锁、信号量等方式去控制访问,保障内存的一致性。所以会导致性能损耗+同步问题复杂。

    “通过通信来共享内存”的理念强调在并发单元之间通过消息传递来交换数据,而不是直接共享内存。这样,每个并发单元都有自己的内存,不与其他并发单元共享。当它们需要共享数据时,它们会通过发送消息来完成。这种方法的优点是避免了使用锁和同步机制,从而降低了死锁和竞态条件的风险,简化了并发程序的设计和理解。

    Channel整体结构

    源码位置

    位于src/runtime下的chan.go中。

    在这里插入图片描述

    Channel整体结构图

    图源:https://i6448038.github.io/2019/04/11/go-channel/

    在这里插入图片描述

    Channel结构体

    type hchan struct {
    	qcount   uint           // total data in the queue
    	dataqsiz uint           // size of the circular queue
    	buf      unsafe.Pointer // points to an array of dataqsiz elements
    	elemsize uint16
    	closed   uint32
    	elemtype *_type // element type
    	sendx    uint   // send index
    	recvx    uint   // receive index
    	recvq    waitq  // list of recv waiters
    	sendq    waitq  // list of send waiters
    
    	// lock protects all fields in hchan, as well as several
    	// fields in sudogs blocked on this channel.
    	//
    	// Do not change another G's status while holding this lock
    	// (in particular, do not ready a G), as this can deadlock
    	// with stack shrinking.
    	lock mutex
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    我们可以看到,其中有一个buf空间,这个对应的是我们生成的有缓冲通道、无缓冲通道。recvqsendq对应的是waitq类型,其中主要存储的是发送、接受方的Goroutine。

    waitq&&sudog

    waitq

    
    type waitq struct {
    	first *sudog
    	last  *sudog
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    sudog

    // sudogs are allocated from a special pool. Use acquireSudog and
    // releaseSudog to allocate and free them.
    type sudog struct {
    	// The following fields are protected by the hchan.lock of the
    	// channel this sudog is blocking on. shrinkstack depends on
    	// this for sudogs involved in channel ops.
    
    	g *g
    
    	next *sudog
    	prev *sudog
    	elem unsafe.Pointer // data element (may point to stack)
    
    	// The following fields are never accessed concurrently.
    	// For channels, waitlink is only accessed by g.
    	// For semaphores, all fields (including the ones above)
    	// are only accessed when holding a semaRoot lock.
    
    	acquiretime int64
    	releasetime int64
    	ticket      uint32
    
    	// isSelect indicates g is participating in a select, so
    	// g.selectDone must be CAS'd to win the wake-up race.
    	isSelect bool
    
    	// success indicates whether communication over channel c
    	// succeeded. It is true if the goroutine was awoken because a
    	// value was delivered over channel c, and false if awoken
    	// because c was closed.
    	success bool
    
    	parent   *sudog // semaRoot binary tree
    	waitlink *sudog // g.waiting list or semaRoot
    	waittail *sudog // semaRoot
    	c        *hchan // channel
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    Channel工作流程

    创建管道

    先在创建阶段:会根据缓冲大小对buf进行初始化,无缓冲通道的buf为0。具体见

    发送数据

    发送数据前:
    首先会进行加锁(因此-“一个通道同时只能进行一个收/发操作”)。如果Channel已关闭,则会报panic。

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	lock(&c.lock)
    
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("send on closed channel"))
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    发送数据时,会分为多种情况:
    1、有等待的接收者——直接发给阻塞的接收者。
    2、无等待 但是缓冲区有空间——写入Channel的缓冲区。
    3、无等待 无空间——等待其他Goroutine接受数据。

    update:刚才脑子转不过来了一下。。疑惑:情况1的时候,那缓存内的东西先进去,不应该排队放后面吗?为什么直接丢给goroutine了。一下反应过来:如果缓存有内容,那接收者就直接拿了啊!!阻塞,就说明他已经缓存拿不到东西,才会去阻塞等待的。

    工作流程:(由于GPT4.0解读源码总结完成)

    1、检查通道是否为nil:如果尝试向一个nil的通道发送数据,如果是非阻塞的(block为false),则直接返回false;如果是阻塞的,则该goroutine会被挂起,直到被唤醒(实际上,向nil通道发送数据会导致永久阻塞,这里的唤醒仅是理论上的,因为后面紧接着会调用throw("unreachable")抛出异常,表示这个代码路径不应该被执行)。
    
    2、快速路径检查:在尝试获取锁之前,先检查通道是否已关闭并且是否已满,以避免在这些明显无法发送成功的情况下还获取锁,提高效率。
    
    3、获取锁:为了保证对通道状态的修改是安全的,需要先获取通道的锁。
    
    4、检查通道是否已关闭:如果通道已经关闭,则抛出“send on closed channel”的异常。
    
    5、尝试直接发送给等待接收的goroutine:如果有goroutine正在等待接收(即接收队列不为空),则直接将值传递给它,并唤醒该goroutine。
    
    6、检查通道缓冲区是否有空间:如果通道的缓冲区还有空间,则将值放入缓冲区,并更新相关指标。
    
    7、非阻塞发送失败:如果是非阻塞发送且到达这一步,说明无法立即发送,释放锁并返回false8、准备阻塞发送:如果是阻塞发送,则创建一个sudog对象表示当前goroutine,将其加入到发送队列中,并挂起当前goroutine等待被唤醒(通常是接收方接收到值或通道被关闭时唤醒)。
    
    9、唤醒后的处理:被唤醒后,检查发送是否成功(通过检查sudog的success字段)。如果通道在等待期间被关闭,则抛出“send on closed channel”的异常。
    
    10、资源清理和返回:最后,释放sudog资源,返回发送是否成功。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    详细源码工作流程,见此

    接收数据

    当已被关闭&&缓冲区没有数据,会返回。

    接收的三种情况:
    1、存在发送者时,直接从发送者或缓冲区数据。
    2、缓冲区存在数据,从缓冲区接收。
    3、都不存在时,等待其他Goroutine发送。

    源码阅读(chanrecv函数):

    1、检查通道是否为空:如果尝试从一个nil的通道接收数据,根据block参数的不同,可能会导致goroutine挂起或者直接返回。
    
    2、快速路径检查:在不阻塞的情况下,如果通道为空,则尝试检查通道是否关闭。如果通道已关闭且为空,则清空指针ep指向的内存(如果ep不为nil)并返回。
    
    3、加锁:为了修改通道状态,需要先获取通道的锁。
    
    4、通道已关闭且无数据:如果通道已关闭并且没有数据,清空ep指向的内存并返回。
    
    5、从等待发送的goroutine接收数据:如果通道未关闭且有等待发送的goroutine,直接从发送方接收数据。
    
    6、从通道缓冲区接收数据:如果通道有数据(qcount > 0),则从通道的缓冲区接收数据到ep指向的位置,并清空缓冲区中该数据的位置。
    
    7、非阻塞情况下无数据可接:如果是非阻塞接收且到达这一步,说明无法立即接收数据,释放锁并返回。
    
    8、准备阻塞接收:如果是阻塞接收,则挂起当前goroutine,直到有数据可接收或通道被关闭。
    
    9、唤醒后的处理:被唤醒后,检查接收是否成功。如果接收成功,则ep指向的位置已被填充。
    
    10、资源清理和返回:最后,释放相关资源,返回操作结果。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    关闭管道

    closechan函数:

    1、检查通道是否为nil:如果尝试关闭一个nil的通道,会引发panic2、加锁:为了保证对通道状态的修改是并发安全的,需要先获取通道的锁。
    
    3、检查通道是否已经关闭:如果通道已经被关闭(c.closed != 0),则释放锁并panic。这防止了通道被多次关闭导致的未定义行为。
    
    4、设置通道为关闭状态:将通道的closed标志设置为1,表示该通道已经关闭。
    
    5、处理等待接收的goroutine:遍历接收队列recvq,对于队列中的每个等待接收的goroutine(通过sudog表示),清空它们等待接收的元素指针(如果有),并将它们标记为操作未成功(success = false)。这些goroutine将会被唤醒,但是接收操作会因为通道已关闭而失败。
    
    6、处理等待发送的goroutine:遍历发送队列sendq,对于队列中的每个等待发送的goroutine,清空它们准备发送的元素指针(如果有),并将它们标记为操作未成功。这些goroutine在被唤醒后会感知到通道已经关闭,并可能引发panic7、释放锁:完成上述操作后,释放通道的锁。
    
    8、唤醒所有goroutine:最后,对于通过上述步骤收集到的所有goroutine(存储在glist中),将它们标记为就绪状态(goready),这样它们就可以被调度执行了。这确保了所有因为该通道操作而阻塞的goroutine都能继续执行,无论是因为等待发送还是接收。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    向已关闭的Channel收发,会如何?

    结论

    向已关闭的Channel发送,会报panic
    向已关闭的Channel关闭,会报panic
    从已关闭的Channel读数据,先读完缓冲区内容,之后会读出来0(各数据类型的默认值)

    测试

    省略测试代码,网上太多了。参考此文即可——https://segmentfault.com/a/1190000042297722

    源码

    关闭已关闭的/关闭nil的channel

    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
    
    	lock(&c.lock)
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    //chan.go---358行
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    向已关闭的channel发送

    向nil的channel发送(nil通道和无缓冲通道还不是一回事,具体见我另一篇博文)

    	if c == nil {
    		if !block {
    			return false
    		}
    		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
    		throw("unreachable")
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    已关闭的发送

    
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("send on closed channel"))
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    从已关闭的channel接收

    依旧是先判断是否为无缓存、阻塞。

    
    	if c == nil {
    		if !block {
    			return
    		}
    		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    		throw("unreachable")
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    if c.closed != 0 {
    		if c.qcount == 0 {
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			unlock(&c.lock)
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    		// The channel has been closed, but the channel's buffer have data.
    	} 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    如何优雅的关闭Channel

    这部分主要参考自:https://qcrao91.gitbook.io/go/channel/ru-he-you-ya-di-guan-bi-channel

    直接关闭存在的问题

    主要就是上述“向已关闭的Channel收发,会如何?”中所提到的情况:
    1、向已关闭的channel中发送数据,会panic
    2、重复关闭已经关闭的channel,会panic。
    3、从已关闭的channel接收数据,收到的是0。

    一个比较粗糙的实现

    利用从读channel的,会返回bool的性质。

    func IsClosed(ch <-chan T) bool {
        select {
        case <-ch:
            return true
        default:
        }
    
        return false
    }
    
    func main() {
        c := make(chan T)
        fmt.Println(IsClosed(c)) // false
        close(c)
        fmt.Println(IsClosed(c)) // true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    但这样比较粗糙:
    一来,对channel的状态进行了修改。
    二来,检测的瞬间和关闭瞬间有间隔。
    三来,多个同时调用的话,也可能重复关闭。

    合理的方案

    don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

    即:不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。

    因此考虑从发送端进行关闭。

    单一sender的情况

    即:
    一个 sender,一个 receiver

    一个 sender, M 个 receiver

    对于这种情况,直接sender方,发完之后关了即可。

    多个sender的情况

    即:
    N 个 sender,一个 reciver

    N 个 sender, M 个 receiver

    针对一个reciver的情况:

    增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止接收数据。

    func main() {
        rand.Seed(time.Now().UnixNano())
    
        const Max = 100000
        const NumSenders = 1000
    
        dataCh := make(chan int, 100)
        stopCh := make(chan struct{})
    
        // senders
        for i := 0; i < NumSenders; i++ {
            go func() {
                for {
                    select {
                    case <- stopCh:
                        return
                    case dataCh <- rand.Intn(Max):
                    }
                }
            }()
        }
    
        // the receiver
        go func() {
            for value := range dataCh {
                if value == Max-1 {
                    fmt.Println("send stop signal to senders.")
                    close(stopCh)
                    return
                }
    
                fmt.Println(value)
            }
        }()
    
        select {
        case <- time.After(time.Hour):
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    注意:这个代码中,其实并没有关闭channel。这个优雅的处理就是指的:不用他了,将他交给Golang的GC机制去处理。

    针对多个reciver的情况:

    如果依旧采取上述方案,则可能遇到的情况是:下达了多个关闭命令,依旧造成“向已关闭的channel进行关闭”。因此使用一个“中间人”channel,reciver都向他发送stop,收到第一个后 就向sender发stop。reciver默认长度设置为:Num(senders) + Num(receivers),可以避免阻塞问题。

    此部分代码见参考链接的原文即可

    其他注意事项

    待补充

    参考资料

    小部分用到了GPT 4.0-0125进行辅助理解,已标注说明。

    原理

    https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel

    https://i6448038.github.io/2019/04/11/go-channel/

    https://zhuanlan.zhihu.com/p/496004953

    关闭的收发

    https://segmentfault.com/a/1190000042297722

    优雅关闭

    https://qcrao91.gitbook.io/go/channel/ru-he-you-ya-di-guan-bi-channel

  • 相关阅读:
    女性相关新闻易语言代码
    回溯-数组总和II
    python脚本(渗透测试)
    MySQL单表操作&约束
    Ansys Zemax | 如何设计光谱仪——实际应用
    Mac入门 使用brew安装软件
    找不到x3daudio1_7.dll怎么解决?x3daudio1_7.dll的5个修复方法
    推荐系统实践读书笔记-04利用用户标签数据
    zookeeper3.8.0集群安装及基础命令
    达梦数据库报错:Invalid column name [PASSWORD]
  • 原文地址:https://blog.csdn.net/Ws_Te47/article/details/136510385