• context源码学习


    type Context interface {
    	// Deadline 如果context 被设置了超时,Deadline 将会返回超时时限
    	Deadline() (deadline time.Time, ok bool)
    	// Done 返回一个只读的channel, 这个channel只能读取,没有地方写入,一旦将这个channel close,可以根据读取出来的nil做通信处理
    	Done() <-chan struct{}
    	// Err 返回context结束时的error信息
    	Err() error
    	// Value 类似于map,根据key返回对应的value值
    	Value(key any) any
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意**<-chan struct{}**这是golang中的一个语法糖,表示这个channel只读,不能写入。
    Context本质上是一个interface,他提供了四个接口,具体的注释很长,可以看源码的英文注释。
    其中Value,为了防止 Key 冲突,最好将 Key 的类型定义为非导出类型,然后为其定义访问器。看一个通过 Context 共享用户信息的例子:

    package user
    
    import "context"
    
    // User 是要存于 Context 中的 Value 类型.
    type User struct {...}
    
    // key 定义为了非导出类型,以避免和其他 package 中的 key 冲突
    type key int
    
    // userKey 是 Context 中用来关联 user.User 的 key,是非导出变量
    // 客户端需要用 user.NewContext 和 user.FromContext 构建包含
    // user 的 Context 和从 Context 中提取相应 user 
    var userKey key
    
    // NewContext 返回一个带有用户值 u 的 Context.
    func NewContext(ctx context.Context, u *User) context.Context {
      return context.WithValue(ctx, userKey, u)
    }
    
    // FromContext 从 Context 中提取 user,如果有的话.
    func FromContext(ctx context.Context) (*User, bool) {
      u, ok := ctx.Value(userKey).(*User)
      return u, ok
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    emptyCtx

    空的context

    type emptyCtx int
    
    func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    	return
    }
    
    func (*emptyCtx) Done() <-chan struct{} {
    	return nil
    }
    
    func (*emptyCtx) Err() error {
    	return nil
    }
    
    func (*emptyCtx) Value(key any) any {
    	return nil
    }
    
    func (e *emptyCtx) String() string {
    	switch e {
    	case background:
    		return "context.Background"
    	case todo:
    		return "context.TODO"
    	}
    	return "unknown empty Context"
    }
    
    var (
    	background = new(emptyCtx)
    	todo       = new(emptyCtx)
    )
    
    func Background() Context {
    	return background
    }
    
    func TODO() Context {
    	return todo
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    emptyCtx一般作为context的根节点,我们可以看到empty只是实现了context的四个函数,但是没有做相应的逻辑处理,返回nil值。我们可以通过Background跟TODO构建相应的context。当我们不知道context用来干什么的时候可以用TODO,预留一下context。

    valueCtx

    // A valueCtx carries a key-value pair. It implements Value for that key and
    // delegates all other calls to the embedded Context.
    type valueCtx struct {
    	Context
    	key, val any
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    valueCtx用来存储相应的key跟value。我们可以用WithValue函数来创建相应的value context。

    func WithValue(parent Context, key, val any) Context {
    	if parent == nil {
    		panic("cannot create context from nil parent")
    	}
    	if key == nil {
    		panic("nil key")
    	}
    	if !reflectlite.TypeOf(key).Comparable() {
    		panic("key is not comparable")
    	}
    	// 再包一层key跟value值
    	return &valueCtx{parent, key, val}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们可以看到WithValue其实就是将传进来的context再包一层key跟value,返回。

    func (c *valueCtx) Value(key any) any {
    	if c.key == key {
    		return c.val
    	}
    	return value(c.Context, key)
    }
    
    func value(c Context, key any) any {
    	for {
    		switch ctx := c.(type) {
    		case *valueCtx:
    			if key == ctx.key {
    				return ctx.val
    			}
    			c = ctx.Context
    		case *cancelCtx:
    			if key == &cancelCtxKey {
    				return c
    			}
    			c = ctx.Context
    		case *timerCtx:
    			if key == &cancelCtxKey {
    				return &ctx.cancelCtx
    			}
    			c = ctx.Context
    		case *emptyCtx:
    			return nil
    		default:
    			return c.Value(key)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    我们可以通过调用Value函数获取key对应的value值。整个过程调用的是value函数,这个函数的过程我们可以理解为剥洋葱,一层一层获取父级context。其中的cancelCtxKey比较特别,是用来查找从当前的context出发最近的祖先cancel context节点。后面的cancelCtx会用到。

    cancelCtx

    首先看下cancelCtx的结构体

    type cancelCtx struct {
    	// 父级context
    	Context
    	// mutex锁
    	mu       sync.Mutex            // protects following fields
    	// 采用atomic.Value来存储close channel
    	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    	// 儿子节点
    	children map[canceler]struct{} // set to nil by the first cancel call
    	// 错误信息
    	err      error                 // set to non-nil by the first cancel call
    }
    
    type canceler interface {
    	// cancel函数,将所有的child接口cancel
    	cancel(removeFromParent bool, err error)
    	Done() <-chan struct{}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    同样的也提供了WithCancel函数用来获得一个cancelCtx

    func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    	// 先做参数校验
    	if parent == nil {
    		panic("cannot create context from nil parent")
    	}
    	// new一个cancelCtx,主要就是绑定父级context
    	c := newCancelCtx(parent)
    	// propagateCancel比较关键, propagateCancel可以理解为挂载,将child节点挂在离child节点最近的祖先cancelCtx节点
    	propagateCancel(parent, &c)
    	// 返回cancelCtx 以及 相应的cancel函数,如果调用cancel函数会将所有的child context cancel掉,注意cancel传递的第一个参数是true,表示要将该节点从父亲节点中摘除
    	return &c, func() { c.cancel(true, Canceled) }
    }
    
    func newCancelCtx(parent Context) cancelCtx {
    	return cancelCtx{Context: parent}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    接下来重点看下propagateCancel

    // propagateCancel主要是将child挂载在离child节点最近的祖先cancelCtx节点
    func propagateCancel(parent Context, child canceler) {
    	// 首先做参数校验
    	done := parent.Done()
    	if done == nil {
    		return // parent is never canceled
    	}
    	// 判断父亲节点是否已经被cancel,如果被cancel就要将child也关闭,因为既然父亲节点已经cancel,那么在父亲节点下的所有子节点都要cancel
    	select {
    	case <-done:
    		// parent is already canceled
    		child.cancel(false, parent.Err())
    		return
    	default:
    	}
    	//找到第一个离parent最近的祖先cancelCtx节点
    	if p, ok := parentCancelCtx(parent); ok {
    		p.mu.Lock()
    		if p.err != nil {
    			// parent has already been canceled
    			child.cancel(false, p.err)
    		} else {
    			if p.children == nil {
    				p.children = make(map[canceler]struct{})
    			}
    			// 找到了就将child节点挂载在p节点下
    			p.children[child] = struct{}{}
    		}
    		p.mu.Unlock()
    	} else {
    		// 提供对第三方context支持,如果是第三方的context就开goroutine做监听
    		atomic.AddInt32(&goroutines, +1)
    		go func() {
    			select {
    			case <-parent.Done():
    				child.cancel(false, parent.Err())
    			case <-child.Done():
    			}
    		}()
    	}
    }
    
    func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    	done := parent.Done()
    	if done == closedchan || done == nil {
    		return nil, false
    	}
    	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    	if !ok {
    		return nil, false
    	}
    	pdone, _ := p.done.Load().(chan struct{})
    	if pdone != done {
    		return nil, false
    	}
    	return p, true
    }
    
    func (c *cancelCtx) Value(key any) any {
    	if key == &cancelCtxKey {
    		return c
    	}
    	return value(c.Context, key)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    接下来看下另一个关键的函数cancel

    
    var Canceled = errors.New("context canceled")
    var DeadlineExceeded error = deadlineExceededError{}
    
    // 第一参数表示是否将当前的节点从父亲节点中摘除,第二参数返回因为什么而cancel,只能是Canceled or DeadlineExceeded
    func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    	if err == nil {
    		panic("context: internal error: missing cancel error")
    	}
    	c.mu.Lock()
    	// 如果err非nil说明已经被其他goroutine cancel了
    	if c.err != nil {
    		c.mu.Unlock()
    		return // already canceled
    	}
    	c.err = err
    	// 原子加载channel
    	d, _ := c.done.Load().(chan struct{})
    	if d == nil {
    		// 如果为nil那么就原子存储一下一个关闭的channel
    		c.done.Store(closedchan)
    	} else {
    		// 关闭channel
    		close(d)
    	}
    	// 递归便利所有挂载的child节点,将当前节点下的所有子节点全部cancel
    	for child := range c.children {
    		// NOTE: acquiring the child's lock while holding parent's lock.
    		child.cancel(false, err)
    	}
    	// 置空child节点
    	c.children = nil
    	c.mu.Unlock()
    	
    	// 将节点从父亲节点中摘除
    	if removeFromParent {
    		removeChild(c.Context, c)
    	}
    }
    
    func removeChild(parent Context, child canceler) {
    	// 找到第一个离parent节点最近的祖先cancelCtx节点
    	p, ok := parentCancelCtx(parent)
    	if !ok {
    		return
    	}
    	p.mu.Lock()
    	if p.children != nil {
    		// 从祖先节点的map中将child删除
    		delete(p.children, child)
    	}
    	p.mu.Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    cancelCtx的主要操作就是创建一个cancelCtx的时候会用(剥洋葱)找到第一个离当前的cancelCtx最近的祖先cancelCtx,并将当前新建的cancelCtx挂载当祖先节点中。cancel的时候就是递归遍历自己的map,将自己所有子孙cancelCtx的channel都close掉,再将自己从父亲节点中摘除。摘除之后就交给GC做回收了。

    timerCtx

    type timerCtx struct {
    	cancelCtx
    	timer *time.Timer // Under cancelCtx.mu.
    
    	deadline time.Time
    }
    
    func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    	if parent == nil {
    		panic("cannot create context from nil parent")
    	}
    	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
    		// The current deadline is already sooner than the new one.
    		return WithCancel(parent)
    	}
    	c := &timerCtx{
    		cancelCtx: newCancelCtx(parent),
    		deadline:  d,
    	}
    	propagateCancel(parent, c)
    	dur := time.Until(d)
    	if dur <= 0 {
    		c.cancel(true, DeadlineExceeded) // deadline has already passed
    		return c, func() { c.cancel(false, Canceled) }
    	}
    	c.mu.Lock()
    	defer c.mu.Unlock()
    	if c.err == nil {
    		c.timer = time.AfterFunc(dur, func() {
    			c.cancel(true, DeadlineExceeded)
    		})
    	}
    	return c, func() { c.cancel(true, Canceled) }
    }
    
    func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    	return WithDeadline(parent, time.Now().Add(timeout))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    timerCtx只是在cancelCtx的基础上再加了一个计时器,时间到了的时候就调用cancel函数。

    推荐阅读

    https://www.qtmuniao.com/2020/07/12/go-context/
    https://www.cnblogs.com/qcrao-2018/p/11007503.html

  • 相关阅读:
    ChatGPT总结(持续更新)
    AQS核心原理分析《下》
    谈谈 Redis 过期删除策略
    C++11之防止类型收窄(列表初始化)
    【CV】可变形卷积:用于目标检测和语义分割的卷积层
    【linux进程(三)】进程有哪些状态?--Linux下常见的三种进程状态
    安装Hadoop下hive的问题
    ensp中查看带宽信息
    ReentrantLock源码分析
    linux网络编程之tcp
  • 原文地址:https://blog.csdn.net/leekerian/article/details/126810483