• golang的channel实现原理


    golang的channel实现原理

    chan结构

    src/runtime/chan.go

    type hchan struct {
        qcount   uint           // 当前队列中剩余元素个数
        dataqsiz uint           // 环形队列长度,即可以存放的元素个数
        buf      unsafe.Pointer // 环形队列指针,也就是指向底层数组的指针
        elemsize uint16         // 每个元素的大小
        elemtype *_type         // 元素类型
        closed   uint32         // 标识关闭状态
        sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置,环形队列的尾部下标
        recvx    uint           // 队列下标,指示元素从队列的该位置读出,环形队列的头部下标
        recvq    waitq          // 等待读消息的goroutine队列
        sendq    waitq          // 等待写消息的goroutine队列
        lock mutex              // 互斥锁,chan不允许并发读写
    }
    
    type waitq struct {
    	first *sudog
    	last  *sudog
    }
    
    type sudog struct {
    	// The following fields are protected by the hchan.lock of the
    	// channel this sudog is blocking on. shrinkstack depends on
    	// this for sudogs involved in channel ops.
    
    	g *g
    
    	next *sudog
    	prev *sudog
    	elem unsafe.Pointer // data element (may point to stack)
    
    	// The following fields are never accessed concurrently.
    	// For channels, waitlink is only accessed by g.
    	// For semaphores, all fields (including the ones above)
    	// are only accessed when holding a semaRoot lock.
    
    	acquiretime int64
    	releasetime int64
    	ticket      uint32
    
    	// isSelect indicates g is participating in a select, so
    	// g.selectDone must be CAS'd to win the wake-up race.
    	isSelect bool
    
    	// success indicates whether communication over channel c
    	// succeeded. It is true if the goroutine was awoken because a
    	// value was delivered over channel c, and false if awoken
    	// because c was closed.
    	success bool
    
    	parent   *sudog // semaRoot binary tree
    	waitlink *sudog // g.waiting list or semaRoot
    	waittail *sudog // semaRoot
    	c        *hchan // channel
    }
    
    func (q *waitq) enqueue(sgp *sudog) {
    	sgp.next = nil
    	x := q.last
    	if x == nil {
    		sgp.prev = nil
    		q.first = sgp
    		q.last = sgp
    		return
    	}
    	sgp.prev = x
    	x.next = sgp
    	q.last = sgp
    }
    
    func (q *waitq) dequeue() *sudog {
    	for {
    		sgp := q.first
    		if sgp == nil {
    			return nil
    		}
    		y := sgp.next
    		if y == nil {
    			q.first = nil
    			q.last = nil
    		} else {
    			y.prev = nil
    			q.first = y
    			sgp.next = nil // mark as removed (see dequeueSudog)
    		}
    
    		// if a goroutine was put on this queue because of a
    		// select, there is a small window between the goroutine
    		// being woken up by a different case and it grabbing the
    		// channel locks. Once it has the lock
    		// it removes itself from the queue, so we won't see it after that.
    		// We use a flag in the G struct to tell us when someone
    		// else has won the race to signal this goroutine but the goroutine
    		// hasn't removed itself from the queue yet.
    		if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
    			continue
    		}
    
    		return sgp
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    关于 环形队列

    waitq只记录了第一个和最后一个G的信息。

    这个sudug非常重要,源码中到处可以看到有使用。此处通过sudug的 prev 和 next 构造了一个双向的链表。sudug 中包含了G,以及其他参数。

    channel的一些特性

    • 从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前G会被阻塞。

    • 向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前G会被阻塞。

    • 被阻塞的G将会挂在channel的等待队列中:

      因读阻塞的G会被向channel写入数据的G唤醒;

      因写阻塞的G会被从channel读数据的G唤醒;

    • 对于buffered channel:

      c.qcount > 0 意味着c.recvq为空;

      c.qcount < c.dataqsiz 意味着c.sendq为空;

    • 一般情况下recvqsendq至少有一个为空。只有一个例外,那就是只有一个 G 使用 select 语句向一个无缓冲的channel一边写数据,一边读数据。

    • elemtype代表类型,用于数据传递过程中的赋值。

    • elemsize代表类型大小,用于在buf中定位元素位置。

    • 一个channel同时仅允许被一个G读或写。

    创建channel

    创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。

    func makechan(t *chantype, size int) *hchan {
    	// 元素类型type
        elem := t.elem
    
    	// compiler checks this but be safe.
    	if elem.size >= 1<<16 {
    		throw("makechan: invalid channel element type")
    	}
    	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    		throw("makechan: bad alignment")
    	}
    	// 返回 elem.size * size,并判断结果是否超出了平台的上限
    	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    		panic(plainError("makechan: size out of range"))
    	}
    
    	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    	// buf points into the same allocation, elemtype is persistent.
    	// SudoG's are referenced from their owning thread so they can't be collected.
    	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    	var c *hchan
    	switch {
    	case mem == 0:
             // 有可能size为0,或者元素的类型大小为0(空结构体)
    		// Queue or element size is zero.
    		c = (*hchan)(mallocgc(hchanSize, nil, true))
    		// Race detector uses this location for synchronization.
    		c.buf = c.raceaddr()
    	case elem.ptrdata == 0:
             // 元素的类型不包含指针
    		// Elements do not contain pointers.
    		// Allocate hchan and buf in one call.
    		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    		c.buf = add(unsafe.Pointer(c), hchanSize)
    	default:
             // 元素的类型包含指针
    		// Elements contain pointers.
    		c = new(hchan)
    		c.buf = mallocgc(mem, elem, true)
    	}
    
    	c.elemsize = uint16(elem.size)
    	c.elemtype = elem
        
         // 环形队列大小
    	c.dataqsiz = uint(size)
        
         // 初始化锁
    	lockInit(&c.lock, lockRankHchan)
    
    	if debugChan {
    		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    	}
    	return c
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    // full reports whether a send on c would block (that is, the channel is full).
    // It uses a single word-sized read of mutable state, so although
    // the answer is instantaneously true, the correct answer may have changed
    // by the time the calling function receives the return value.
    // 判断一个G向一个chan发送数据时是否应该被阻塞,也就是是否chan已满
    // 对于无缓冲chan,如果已经存在读的G,那么写G就不用阻塞,这是一个优化的点
    // 对于有缓冲chan,只需要判断缓冲区是否已满即可
    func full(c *hchan) bool {
    	// c.dataqsiz is immutable (never written after the channel is created)
    	// so it is safe to read at any time during channel operation.
    	if c.dataqsiz == 0 {
    		// Assumes that a pointer read is relaxed-atomic.
    		return c.recvq.first == nil
    	}
    	// Assumes that a uint read is relaxed-atomic.
    	return c.qcount == c.dataqsiz
    }
    
    // empty reports whether a read from c would block (that is, the channel is
    // empty).  It uses a single atomic read of mutable state.
    func empty(c *hchan) bool {
    	// c.dataqsiz is immutable.
    	if c.dataqsiz == 0 {
    		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
    	}
    	return atomic.Loaduint(&c.qcount) == 0
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    发送数据

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	if c == nil {
    		if !block {
    			return false
    		}
    		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    		throw("unreachable")
    	}
    
    	if debugChan {
    		print("chansend: chan=", c, "\n")
    	}
    
    	if raceenabled {
    		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
    	}
    
    	// Fast path: check for failed non-blocking operation without acquiring the lock.
    	//
    	// After observing that the channel is not closed, we observe that the channel is
    	// not ready for sending. Each of these observations is a single word-sized read
    	// (first c.closed and second full()).
    	// Because a closed channel cannot transition from 'ready for sending' to
    	// 'not ready for sending', even if the channel is closed between the two observations,
    	// they imply a moment between the two when the channel was both not yet closed
    	// and not ready for sending. We behave as if we observed the channel at that moment,
    	// and report that the send cannot proceed.
    	//
    	// It is okay if the reads are reordered here: if we observe that the channel is not
    	// ready for sending and then observe that it is not closed, that implies that the
    	// channel wasn't closed during the first observation. However, nothing here
    	// guarantees forward progress. We rely on the side effects of lock release in
    	// chanrecv() and closechan() to update this thread's view of c.closed and full().
    	if !block && c.closed == 0 && full(c) {
    		return false
    	}
    
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    
    	lock(&c.lock)
    
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("send on closed channel"))
    	}
    
    	if sg := c.recvq.dequeue(); sg != nil {
    		// Found a waiting receiver. We pass the value we want to send
    		// directly to the receiver, bypassing the channel buffer (if any).
    		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true
    	}
    
    	if c.qcount < c.dataqsiz {
    		// Space is available in the channel buffer. Enqueue the element to send.
    		qp := chanbuf(c, c.sendx)
    		if raceenabled {
    			racenotify(c, c.sendx, nil)
    		}
    		typedmemmove(c.elemtype, qp, ep)
    		c.sendx++
    		if c.sendx == c.dataqsiz {
    			c.sendx = 0
    		}
    		c.qcount++
    		unlock(&c.lock)
    		return true
    	}
    
    	if !block {
    		unlock(&c.lock)
    		return false
    	}
    
    	// Block on the channel. Some receiver will complete our operation for us.
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	mysg.g = gp
    	mysg.isSelect = false
    	mysg.c = c
    	gp.waiting = mysg
    	gp.param = nil
    	c.sendq.enqueue(mysg)
    	// Signal to anyone trying to shrink our stack that we're about
    	// to park on a channel. The window between when this G's status
    	// changes and when we set gp.activeStackChans is not safe for
    	// stack shrinking.
    	atomic.Store8(&gp.parkingOnChan, 1)
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    	// Ensure the value being sent is kept alive until the
    	// receiver copies it out. The sudog has a pointer to the
    	// stack object, but sudogs aren't considered as roots of the
    	// stack tracer.
    	KeepAlive(ep)
    
    	// someone woke us up.
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	closed := !mysg.success
    	gp.param = nil
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	mysg.c = nil
    	releaseSudog(mysg)
    	if closed {
    		if c.closed == 0 {
    			throw("chansend: spurious wakeup")
    		}
    		panic(plainError("send on closed channel"))
    	}
    	return true
    }
    
    // send processes a send operation on an empty channel c.
    // The value ep sent by the sender is copied to the receiver sg.
    // The receiver is then woken up to go on its merry way.
    // Channel c must be empty and locked.  send unlocks c with unlockf.
    // sg must already be dequeued from c.
    // ep must be non-nil and point to the heap or the caller's stack.
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    	if raceenabled {
    		if c.dataqsiz == 0 {
    			racesync(c, sg)
    		} else {
    			// Pretend we go through the buffer, even though
    			// we copy directly. Note that we need to increment
    			// the head/tail locations only when raceenabled.
    			racenotify(c, c.recvx, nil)
    			racenotify(c, c.recvx, sg)
    			c.recvx++
    			if c.recvx == c.dataqsiz {
    				c.recvx = 0
    			}
    			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    		}
    	}
    	if sg.elem != nil {
    		sendDirect(c.elemtype, sg, ep)
    		sg.elem = nil
    	}
    	gp := sg.g
    	unlockf()
    	gp.param = unsafe.Pointer(sg)
    	sg.success = true
    	if sg.releasetime != 0 {
    		sg.releasetime = cputicks()
    	}
    	goready(gp, skip+1)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164

    向一个channel中写数据简单过程如下:

    • 如果等待接收队列 recvq 不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从 recvq 取出G,并把数据直接写入到G中,最后把该G唤醒,结束发送过程;
    • 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
    • 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入 sendq,进入睡眠,等待被读G唤醒;

    在这里插入图片描述

    此处说的把数据直接写入到G中是个什么意思呢?这个还需要借助接收数据的源码来一起看,后面再说。

    接收数据

    // chanrecv receives on channel c and writes the received data to ep.
    // ep may be nil, in which case received data is ignored.
    // If block == false and no elements are available, returns (false, false).
    // Otherwise, if c is closed, zeros *ep and returns (true, false).
    // Otherwise, fills in *ep with an element and returns (true, true).
    // A non-nil ep must point to the heap or the caller's stack.
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    	// raceenabled: don't need to check ep, as it is always on the stack
    	// or is new memory allocated by reflect.
    
    	if debugChan {
    		print("chanrecv: chan=", c, "\n")
    	}
    
    	if c == nil {
    		if !block {
    			return
    		}
    		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    		throw("unreachable")
    	}
    
    	// Fast path: check for failed non-blocking operation without acquiring the lock.
    	if !block && empty(c) {
    		// After observing that the channel is not ready for receiving, we observe whether the
    		// channel is closed.
    		//
    		// Reordering of these checks could lead to incorrect behavior when racing with a close.
    		// For example, if the channel was open and not empty, was closed, and then drained,
    		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
    		// we use atomic loads for both checks, and rely on emptying and closing to happen in
    		// separate critical sections under the same lock.  This assumption fails when closing
    		// an unbuffered channel with a blocked send, but that is an error condition anyway.
    		if atomic.Load(&c.closed) == 0 {
    			// Because a channel cannot be reopened, the later observation of the channel
    			// being not closed implies that it was also not closed at the moment of the
    			// first observation. We behave as if we observed the channel at that moment
    			// and report that the receive cannot proceed.
    			return
    		}
    		// The channel is irreversibly closed. Re-check whether the channel has any pending data
    		// to receive, which could have arrived between the empty and closed checks above.
    		// Sequential consistency is also required here, when racing with such a send.
    		if empty(c) {
    			// The channel is irreversibly closed and empty.
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    	}
    
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    
    	lock(&c.lock)
    
    	if c.closed != 0 && c.qcount == 0 {
    		if raceenabled {
    			raceacquire(c.raceaddr())
    		}
    		unlock(&c.lock)
    		if ep != nil {
    			typedmemclr(c.elemtype, ep)
    		}
    		return true, false
    	}
    
    	if sg := c.sendq.dequeue(); sg != nil {
    		// Found a waiting sender. If buffer is size 0, receive value
    		// directly from sender. Otherwise, receive from head of queue
    		// and add sender's value to the tail of the queue (both map to
    		// the same buffer slot because the queue is full).
    		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true, true
    	}
    
    	if c.qcount > 0 {
    		// Receive directly from queue
    		qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    		}
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
    		typedmemclr(c.elemtype, qp)
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		c.qcount--
    		unlock(&c.lock)
    		return true, true
    	}
    
    	if !block {
    		unlock(&c.lock)
    		return false, false
    	}
    
    	// no sender available: block on this channel.
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	gp.waiting = mysg
    	mysg.g = gp
    	mysg.isSelect = false
    	mysg.c = c
    	gp.param = nil
    	c.recvq.enqueue(mysg)
    	// Signal to anyone trying to shrink our stack that we're about
    	// to park on a channel. The window between when this G's status
    	// changes and when we set gp.activeStackChans is not safe for
    	// stack shrinking.
    	atomic.Store8(&gp.parkingOnChan, 1)
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    	// someone woke us up
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	success := mysg.success
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true, success
    }
    
    // recv processes a receive operation on a full channel c.
    // There are 2 parts:
    // 1) The value sent by the sender sg is put into the channel
    //    and the sender is woken up to go on its merry way.
    // 2) The value received by the receiver (the current G) is
    //    written to ep.
    // For synchronous channels, both values are the same.
    // For asynchronous channels, the receiver gets its data from
    // the channel buffer and the sender's data is put in the
    // channel buffer.
    // Channel c must be full and locked. recv unlocks c with unlockf.
    // sg must already be dequeued from c.
    // A non-nil ep must point to the heap or the caller's stack.
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    	if c.dataqsiz == 0 {
    		if raceenabled {
    			racesync(c, sg)
    		}
    		if ep != nil {
    			// copy data from sender
    			recvDirect(c.elemtype, sg, ep)
    		}
    	} else {
    		// Queue is full. Take the item at the
    		// head of the queue. Make the sender enqueue
    		// its item at the tail of the queue. Since the
    		// queue is full, those are both the same slot.
    		qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    			racenotify(c, c.recvx, sg)
    		}
    		// copy data from queue to receiver
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
    		// copy data from sender to queue
    		typedmemmove(c.elemtype, qp, sg.elem)
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    	}
    	sg.elem = nil
    	gp := sg.g
    	unlockf()
    	gp.param = unsafe.Pointer(sg)
    	sg.success = true
    	if sg.releasetime != 0 {
    		sg.releasetime = cputicks()
    	}
    	goready(gp, skip+1)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 如果等待发送队列 sendq 不为空,且没有缓冲区,直接从 sendq 中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
    • 如果等待发送队列 sendq 不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
    • 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
    • 如果缓冲区为空或没有缓冲区,然后 sendq 也为空,则将当前G加入 recvq,进入睡眠,等待被写G唤醒;

    在这里插入图片描述

    发送和接收过程

    1、发送数据
    c <- d
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
    
    ep 指向要发送的数据 d
    
    如果从recvq中取到了接收者,那么将 ep 赋值给 接收者sudug.elem,直接返回,接收者也被唤醒。
    如果能进缓冲区,那么发送成功,直接返回。
    如果要进入sendq,那么将 ep 赋值给自己的 sudug.elem,挂起。
    
    2、接收数据
    d <- c
    d, ok <- c
    
    ep 指向要接收的数据 d
    received 返回给 ok
    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
    
    如果能从sendq取到发送者,将发送者sudug.elem赋值给 ep,返回,发送者也被唤醒。
    如果能从缓冲区拿到数据,那么直接赋值给 ep,返回。
    如果要进入recvq,将ep赋值给自己的sudug.elem,挂起。
    
    chanrecv的参数block如果传入false,意味着如果没有数据是不会被挂起的,比如我们在使用select
    语句读取多个chan的时候。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    关闭chan

    func closechan(c *hchan) {
    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
    
    	lock(&c.lock)
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    
    	if raceenabled {
    		callerpc := getcallerpc()
    		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
    		racerelease(c.raceaddr())
    	}
    
    	c.closed = 1
    
    	var glist gList
    
    	// release all readers
    	for {
    		sg := c.recvq.dequeue()
    		if sg == nil {
    			break
    		}
    		if sg.elem != nil {
    			typedmemclr(c.elemtype, sg.elem)
    			sg.elem = nil
    		}
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    
    	// release all writers (they will panic)
    	for {
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    		sg.elem = nil
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    	unlock(&c.lock)
    
    	// Ready all Gs now that we've dropped the channel lock.
    	for !glist.empty() {
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 关闭channel时会把 recvq 中的G全部唤醒,本该写入G的数据位置为nil,也就是零值。但是,关闭通道后,如果又有G来读取通道数据,如果 channel 中有值,这里特指带 buffer 的 channel,那么就从 channel 中取,如果没有值,那么会返回 channel 元素的零值。

    • 关闭channel时会把 sendq 中的G全部唤醒,但这些G会panic,因此在不确定是否还有 goroutine 需要向 channel 发送数据时,请勿贸然关闭 channel。

    • 已关闭的 channel,再次关闭会 panic。

    • 已关闭的 channel 不能再向其中发送内容,否则会 panic。

  • 相关阅读:
    PMP考试是什么?适合哪些人学?
    【Verilog基础】Verilog中不可综合语句及可综合模型原则
    Kafka之Producer网络传输
    APP开发的方式
    MV*结构的发展
    ESDA in PySal (1) 利用 A-DBSCAN 聚类点并探索边界模糊性
    C#,图论与图算法,图着色问题(Graph Coloring)的威尔士-鲍威尔(Welch Powell Algorithm)算法与源代码
    六、kotlin的函数式编程
    通用工具类
    经典动态规划问题的递归实现方法——LeetCode39 组合总和
  • 原文地址:https://blog.csdn.net/raoxiaoya/article/details/126050296