• 二刷 K8s 源码 - workqueue 的所有细节


    1. 概述 - 何来此文

    有些天没有更新文章了,大约两三周。

    为什么今天会再写一次源码分析相关的文章呢?

    在几年前我写过一篇:《Kubernetes client-go 源码分析 - workqueue》,不过过去2年我没有搞 K8s,忘得差不多了……

    这几周我接了个活,写一个 vgpu 调度器,我以为是三天的活,结果搞了三周,笑死。

    这个项目涉及到 K8s 自定义控制器调度器拓展webhook 等,还是非常有趣的。虽然写起来有点费头发,但是成就感满满。可能你的想法和我一开始一样,感觉就是500行代码的事情,三天绰绰有余!哎,都是泪,5000行都写不完……

    过去2年时间我在 DevOps 领域干活,K8s 相关的知识点很多已经记不清了。这会 vgpu 调度器也实现完了,忙绿告一段落,同时心里的疑惑也攒满了,各种我知其然(会用)但是不知其所以然(不记得细节,原理)的点,趁着过年期间,补一补。

    今天从 workqueue 开始。

    写完本文,我又回过头瞟了下几年前写的 workqueue 源码分析文章,略有感慨。当年更加关注的是如何快速看懂源码,更加关注“workqueue 的实现”本身;而今天再看 workqueue,我则更加关注“为什么要看 workqueue”,在自定义控制器里用到了 workqueue,这些用法的道理何在,原理何在。

    同时这次刷 workqueue 源码,我能够深切感受到这段源码的优雅,不止是看懂逻辑,更多是一种享受。就像看《Harry Potter》原著,除了沉浸于作者构造的真切魔法场景,体会看小说的乐趣外,还能被字里行间溢出的那种才华而感动,感慨“原来英文也可以写的文采斐然”,感慨“原来代码还能写的这么漂亮”。

    2. Queue 的实现

    Queue 对应的接口定义如下:

    • util/workqueue/queue.go:26
    type Interface interface {
    	Add(item interface{})
    	Len() int
    	Get() (item interface{}, shutdown bool)
    	Done(item interface{})
    	ShutDown()
    	ShutDownWithDrain()
    	ShuttingDown() bool
    }
    

    这里重点关注 AddGetDone 的实现。Queue 的实现类型如下:

    • util/workqueue/queue.go:115
    type Type struct {
    	// queue defines the order in which we will work on items. Every
    	// element of queue should be in the dirty set and not in the
    	// processing set.
    	queue []t
    
    	// dirty defines all of the items that need to be processed.
    	dirty set
    
    	// Things that are currently being processed are in the processing set.
    	// These things may be simultaneously in the dirty set. When we finish
    	// processing something and remove it from this set, we'll check if
    	// it's in the dirty set, and if so, add it to the queue.
    	processing set
    
    	cond *sync.Cond
        // ......
    }
    

    来看这三个字段:queue、dirty 和 processing:

    • queue:queue 定义了这个队列中 items 的顺序,这是一个 []interface{} 类型的切片,可以保存任意类型的元素。
    • dirty:dirty 的类型是 set 的底层类型是 map[interface{}]struct{},也就是 queue 中的元素集合会存到这个 dirty set 中,也就是“待处理的 items”。
    • processing:processing 的类型也是 set,保存的是正在被处理的 items。

    也就是说 Queue 会将待处理的 items 全部放到 queue 中,这个 queue 是有序的;同时为了让 queue 中不存在重复的 items,所以加了一个 dirty set,毕竟判断 map 中是否存在某个 key 比判断 slice 中是否存在某个 item 要快得多。

    另外当一个 item 从 queue 中被取出时,也就是出队后,这个 item 会被加到 processing set 中,同时被从 dirty set 中移除,也就是 processing set 保存了目前正在被处理,但是没有处理完的 items。换言之,会存在某个操作来表示“一个 item 已经被处理完成”,也就是 Done(item interface{}) 方法。

    下一步我们具体来看 AddGetDone 的实现。

    2.1 Queue.Add(item interface{}) 方法

    • util/workqueue/queue.go:163
    func (q *Type) Add(item interface{}) {
    	q.cond.L.Lock()
    	defer q.cond.L.Unlock()
    	if q.shuttingDown {
    		return
    	}
    	if q.dirty.has(item) {
    		return
    	}
    
    	q.metrics.add(item)
    
    	q.dirty.insert(item)
    	if q.processing.has(item) {
    		return
    	}
    
    	q.queue = append(q.queue, item)
    	q.cond.Signal()
    }
    

    这里用到了条件变量 sync.Cond(我在前面贴 Type 结构体代码时特地保留了 cond 字段),不难想到当队列为空的时候,和 Add 相对应的 Get() 方法会 Wait,等到 Add 完成 item 的添加,调用 Signal() 来唤醒。

    Add() 方法的实现主要就两个逻辑:

    1. 如果 item 在 dirty set 中不存在,就往 dirty set 中插入。这样也就解决了 queue 中重复放入某个 item 的问题。这个特性在 workqueue 包的 doc.go 中被描述为“Stingy”,也就是 Queue 的实现不允许同一个 item 被并发处理。再说的简单一点,就是加入 queue 中放进去了几个一模一样的 item,这时候如果多个 goroutine 去并发 Get() 后其实会拿到一样的 item(比如是一个 pod),那不就乱套了。
    2. 如果 item 在 processing set 中不存在,那就加入到 queue 中。也就是说如果一个 item “正在被处理”,那么它不会被重复加到 queue 里去排队。显然这也是为了“Stingy”。但是这个 item 会被放到 dirty set 里,如果 processing set 中也存在这个 item,那么当这个 item “Done”之后,这次加进来的 item 会从 dirty set 中被提到 queue 里。这个逻辑后面可以看到。

    总结一句话:Add() 会以“吝啬”的方式将 item 入队,这个过程会保证一个 Queue 中同一时刻不存在重复的 item。

    2.2 Queue.Get() 方法

    • util/workqueue/queue.go:196
    func (q *Type) Get() (item interface{}, shutdown bool) {
    	q.cond.L.Lock()
    	defer q.cond.L.Unlock()
    	for len(q.queue) == 0 && !q.shuttingDown {
    		q.cond.Wait()
    	}
    	if len(q.queue) == 0 {
    		// We must be shutting down.
    		return nil, true
    	}
    
    	item = q.queue[0]
    	// The underlying array still exists and reference this object, so the object will not be garbage collected.
    	q.queue[0] = nil
    	q.queue = q.queue[1:]
    
    	q.metrics.get(item)
    
    	q.processing.insert(item)
    	q.dirty.delete(item)
    
    	return item, false
    }
    

    Get() 方法尝试从 queue 中获取第一个 item,同时将其加入到 processing set 中,并且从 dirty set 中删除。

    2.3 Queue.Done(item interface{}) 方法

    • util/workqueue/queue.go:223
    func (q *Type) Done(item interface{}) {
    	q.cond.L.Lock()
    	defer q.cond.L.Unlock()
    
    	q.metrics.done(item)
    
    	q.processing.delete(item)
    	if q.dirty.has(item) {
    		q.queue = append(q.queue, item)
    		q.cond.Signal()
    	} else if q.processing.len() == 0 {
    		q.cond.Signal()
    	}
    }
    

    Done() 方法用来标记一个 item 被处理完成了。调用 Done() 方法的时候,这个 item 被从 processing set 中删除。另外前面提到 Add 的过程中如果发现 item 在 processing set 中存在,那么这个 item 会被暂存到 dirty set 中。这里在处理 item 的 Done 逻辑的时候也就顺带把暂存到 dirty set 中的 item 取出来,加入到 queue 里去了。这个行为同样是有意义的,在“Stingy”的同时允许一个 item 在处理过程中被重新入队,等待下一次重新处理。

    3. DelayingQueue 的实现

    继续来看 DelayingQueue 的实现。

    • util/workqueue/delaying_queue.go:30
    type DelayingInterface interface {
    	Interface
    	// AddAfter adds an item to the workqueue after the indicated duration has passed
    	AddAfter(item interface{}, duration time.Duration)
    }
    

    这个 Interface 也就是前文提到的 QueueDelayingQueueQueue 的基础上加了一个 AddAfter() 接口,实现了“过一会再加入一个 item”的功能。这样也就使得一个 item 处理失败之后,能够在指定延时之后再重新入队。

    行,直接来看 AddAfter() 接口是怎么实现的:

    • util/workqueue/delaying_queue.go:205
    func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    	// don't add if we're already shutting down
    	if q.ShuttingDown() {
    		return
    	}
    
    	q.metrics.retry()
    
    	// immediately add things with no delay
    	if duration <= 0 {
    		q.Add(item)
    		return
    	}
    
    	select {
    	case <-q.stopCh:
    		// unblock if ShutDown() is called
    	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    	}
    }
    

    可以看到当 duration > 0 的时候,AddAfter() 方法只是简单地构造一个 waitFor 对象,然后将其加入 chan *waitFor 类型的 waitingForAddCh 中。所以后面应该关注的核心逻辑是如何消费这个 waitingForAddCh。

    跟一下 waitingForAddCh 的实现,可以找到在 newDelayingQueue() 函数中初始化了 waitingForAddCh,然后调用了一个 delayingType 的 waitingLoop() 方法:

    • util/workqueue/delaying_queue.go:103
    func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
    	ret := &delayingType{
    		Interface:       q,
    		clock:           clock,
    		heartbeat:       clock.NewTicker(maxWait),
    		stopCh:          make(chan struct{}),
    		waitingForAddCh: make(chan *waitFor, 1000),
    		metrics:         newRetryMetrics(name, provider),
    	}
    
    	go ret.waitingLoop()
    	return ret
    }
    

    可以看到 waitingForAddCh 的容量是 1000,类型是 chan *waitFor,而 waitFor 的类型是:

    type waitFor struct {
    	data    t
    	readyAt time.Time
    	// index in the priority queue (heap)
    	index int
    }
    

    下一步继续看 waitingLoop() 方法的实现:

    • util/workqueue/delaying_queue.go:232
    func (q *delayingType) waitingLoop() {
    	defer utilruntime.HandleCrash()
    
    	// Make a placeholder channel to use when there are no items in our list
    	never := make(<-chan time.Time)
    
    	// Make a timer that expires when the item at the head of the waiting queue is ready
    	var nextReadyAtTimer clock.Timer
    
    	// 这是一个优先级队列,用一个最小堆保存了所有 waitFor(waitForPriorityQueue 的类型是 []*waitFor)
    	waitingForQueue := &waitForPriorityQueue{}
    	heap.Init(waitingForQueue)
    
    	waitingEntryByData := map[t]*waitFor{}
    
    	// 后面的所有逻辑都在这个 for 循环里
    	for {
    		if q.Interface.ShuttingDown() {
    			return
    		}
    
    		now := q.clock.Now()
    
    		// Add ready entries
    		// 一直判断 waitingForQueue 这个堆里有没有元素,如果有,就 Peek 出来第一个元素看是不是 ready,如果 ready
    		// 那就通过 q.Add() 方法将其数据放入队列(也就是前文那个 Queue 实现的 Add 逻辑)
    		for waitingForQueue.Len() > 0 {
    			entry := waitingForQueue.Peek().(*waitFor)
    			if entry.readyAt.After(now) {
    				break
    			}
    
    			entry = heap.Pop(waitingForQueue).(*waitFor)
    			q.Add(entry.data)
    			delete(waitingEntryByData, entry.data)
    		}
    
    		// Set up a wait for the first item's readyAt (if one exists)
    		// 代码到这里也就是说前面一个 for 里的 break 被执行了,换言之最小堆中的最小元素,也就是最快 ready 的一个都还没有 ready;
    		// 这里将 nextReadyAtTimer 计时器的时间设置为(最快 ready 的第一个元素的 ready 时间 - 当前时间),
    		// 这样当 nextReadyAt 这个 channel 有数据的时候,最小堆里的第一个元素也就 ready 了。
    		nextReadyAt := never
    		if waitingForQueue.Len() > 0 {
    			if nextReadyAtTimer != nil {
    				nextReadyAtTimer.Stop()
    			}
    			entry := waitingForQueue.Peek().(*waitFor)
    			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
    			nextReadyAt = nextReadyAtTimer.C()
    		}
    
    		// 刚才设置好了一个合适的 nextReadyAt,现在开始 select 等待某个 channel 有反应
    		select {
    		case <-q.stopCh:
    			return
    		// 心跳时间是10s
    		case <-q.heartbeat.C():
    			// continue the loop, which will add ready items
    		// 执行到这里也就是第一个 item ready 了
    		case <-nextReadyAt:
    			// continue the loop, which will add ready items
    
    		// 当有元素被通过 AddAfter() 方法加进来时,waitingForAddCh 就会有内容,这时候会被取出来;
    		// 如果没有 ready,那就调用 insert(大概率主要就是插入最小堆的过程);如果 ready 了那就直接 Add 到 Queue 里。
    		case waitEntry := <-q.waitingForAddCh:
    			if waitEntry.readyAt.After(q.clock.Now()) {
    				insert(waitingForQueue, waitingEntryByData, waitEntry)
    			} else {
    				q.Add(waitEntry.data)
    			}
    			// 通过一个循环将 waitingForAddCh 中的所有元素都消费掉,根据 ready 情况要么插入最小堆(优先级队列),要么直接入队。
    			drained := false
    			for !drained {
    				select {
    				case waitEntry := <-q.waitingForAddCh:
    					if waitEntry.readyAt.After(q.clock.Now()) {
    						insert(waitingForQueue, waitingEntryByData, waitEntry)
    					} else {
    						q.Add(waitEntry.data)
    					}
    				default:
    					drained = true
    				}
    			}
    		}
    	}
    }
    

    我继续看了一眼 insert() 方法的实现,里面就2个逻辑:如果元素不存在,就插入 waitingForQueue 这个优先级队列;如果存在,就更新其 readAt 时间;代码就不具体贴了。

    这个 waitingLoop() 写的还是很漂亮,滴水不漏,逻辑严谨,相当优雅。waitingLoop 通过优先级队列实现元素按照时间顺序从近到远排序,这样就能最高效地获取到最快 ready 的元素。然后又根据优先级队列中最快能 ready 的元素的 ready 剩余时间来构造等待计时器,等待的过程中又监测着 waitingForAddCh 这个 channel 中是否有新元素…… 这个过程很完美地体现了 Golang 特色的 Channel 机制的优雅与强大。

    4. RateLimitingQueue 的实现

    看完了延时队列,继续来看限速队列。

    先贴接口:

    • util/workqueue/rate_limiting_queue.go:22
    // RateLimitingInterface is an interface that rate limits items being added to the queue.
    type RateLimitingInterface interface {
    	DelayingInterface
    
    	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    	AddRateLimited(item interface{})
    
    	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    	// still have to call `Done` on the queue.
    	Forget(item interface{})
    
    	// NumRequeues returns back how many times the item was requeued
    	NumRequeues(item interface{}) int
    }
    

    层层递进,限速队列建立在延时队列之上,RateLimitingInterface 接口包含了 DelayingInterface 接口的所有方法。显然这里我们主要得关注 AddRateLimited(item interface{}) 方法和 Forget(item interface{}) 方法的意图与实现。Forget 方法在我们开发自定义控制器的时候其实会用到,故需知其然,能知其所以然则更佳。

    接着找到 AddRateLimited(item interface{}) 方法的实现:

    // AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
    func (q *rateLimitingType) AddRateLimited(item interface{}) {
    	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
    }
    

    是不是很酷,通过一个 rateLimiter 确定一个 item 的 ready 时间,然后通过延时队列完成延时入队的逻辑。这里就只剩下 rateLimiter 这个限速器的实现需要研究了。

    如果看 Forget(item interface{}) 方法:

    func (q *rateLimitingType) Forget(item interface{}) {
    	q.rateLimiter.Forget(item)
    }
    

    同样,内容都在 rateLimiter 里。行,接着来看 rateLimiter 的实现。

    5. rateLimiter 限速器的实现

    rateLimiter 对应的接口定义如下:

    • util/workqueue/default_rate_limiters.go:27
    type RateLimiter interface {
    	// When gets an item and gets to decide how long that item should wait
    	When(item interface{}) time.Duration
    	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
    	// or for success, we'll stop tracking it
    	Forget(item interface{})
    	// NumRequeues returns back how many failures the item has had
    	NumRequeues(item interface{}) int
    }
    

    实现可就有意思了,变着花样限速:

    1. BucketRateLimiter
    2. ItemExponentialFailureRateLimiter
    3. ItemFastSlowRateLimiter
    4. MaxOfRateLimiter
    5. StepRateLimiter
    6. WithMaxWaitRateLimiter

    六种实现,对应不同的限速思路,体现算法之美的地方。

    6. 控制器里用的默认限速器

    在 sample-controller 里有这样一行代码:

    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    我们先看下默认 workqueue 里用的限速器是哪一个。

    • util/workqueue/default_rate_limiters.go:39
    func DefaultControllerRateLimiter() RateLimiter {
    	return NewMaxOfRateLimiter(
    		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    	)
    }
    

    这里调用了 NewMaxOfRateLimiter() 函数,这个函数会返回一个 MaxOfRateLimiter

    func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
    	return &MaxOfRateLimiter{limiters: limiters}
    }
    

    MaxOfRateLimiter 内部保存多个其他限速器的实现,然后返回一个“限速最久”的结果:

    // MaxOfRateLimiter calls every RateLimiter and returns the worst case response
    // When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
    // were separately delayed a longer time.
    type MaxOfRateLimiter struct {
    	limiters []RateLimiter
    }
    
    func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    	ret := time.Duration(0)
    	for _, limiter := range r.limiters {
    		curr := limiter.When(item)
    		// 拿到最久的限速时长
    		if curr > ret {
    			ret = curr
    		}
    	}
    
    	return ret
    }
    

    你们前面贴的这个 DefaultControllerRateLimiter() 函数的实现就很容易理解了:

    func DefaultControllerRateLimiter() RateLimiter {
    	return NewMaxOfRateLimiter(
    		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    	)
    }
    

    这里接收两个限速器,然后看谁凶就用谁的限速结果。这两个限速器分别是:

    • ItemExponentialFailureRateLimiter
    • BucketRateLimiter

    ItemExponentialFailureRateLimiter 限速器会指数级增加限速时长,也就是先延迟5毫秒,如果失败就变成10毫秒、20毫秒、40毫秒…… 最大不超过 1000秒。

    BucketRateLimiter 是一个比较基础的令牌桶实现,在这里的2个参数10和100的含义是令牌桶里最多有100个令牌,每秒发放10个。

    到这里控制器用的限速器逻辑也就完全能理解了。每个限速器的具体实现代码就不具体贴了,到这里我想要掌握的信息已经足够了。不过在早几年的我的一篇博客《Kubernetes client-go 源码分析 - workqueue》里有具体的每个限速器的实现逻辑分析,如果你感兴趣也可以跳转过去阅读。

    7. 总结

    在编写自定义控制器的时候我们会用到这样的代码:

    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    这里实例化的队列是一个限速队列。

    ResourceEventHandlerFuncs 里我们会写 queue.Add(key) 这样的代码,将一个 key(item/obj)加入到 workqueue 里。

    控制器里还会写到这样的代码:

    • obj, shutdown := c.workqueue.Get()
    • c.workqueue.Done(obj)
    • c.workqueue.Forget(obj)
    • c.workqueue.AddRateLimited(obj)

    这些方法也就对应着一个 obj 从 workqueue 中取出来(Get()),如果处理完成,就调用 Done(obj),从而从队列中彻底移除,同时调用 Forget(obj) 告诉记速器可以忽略这个 obj 了(可别下次同名 obj 来的时候,误判了,让人家白等半天)。最后如果处理失败,遇到点啥临时异常情况,那就放回到 workqueue 里去,用 AddRateLimited(obj) 方法。

    至此,workqueue 相关的用法我们就知其然,亦知其所以然了。

  • 相关阅读:
    msvcr80.dll丢失修复指南:如何轻松解决msvcr80.dll丢失的问题
    C语言-杨辉三角的三种解法-简单易懂篇
    KVM虚拟化技术学习-基础入门
    力扣:167. 两数之和 II - 输入有序数组(Python3)
    CMake入门(一)Ubuntu下使用和Window下使用
    黑客(网络安全)技术速成自学
    Python Flask框架(二)Flask与HTTP
    一文带你认识AscendCL
    matlab|计及源荷不确定性的综合能源生产单元运行调度与容量配置随机优化模型
    bugreport中查看开发者选项动画时长缩放日志
  • 原文地址:https://www.cnblogs.com/daniel-hutao/p/18010835/k8s_clientgo_workqueue