• kube-scheduler的调度上下文


    Scheduler结构#

    Scheduler 是整个 kube-scheduler 的一个 structure,提供了 kube-scheduler 运行所需的组件。

    type Scheduler struct {
    	// Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加
    	Cache internalcache.Cache
    	// Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略
    	Extenders []framework.Extender
    
    	// NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du
    	NextPod func() *framework.QueuedPodInfo
    
    	// Error is called if there is an error. It is passed the pod in
    	// question, and the error
    	Error func(*framework.QueuedPodInfo, error)
    
    	// SchedulePod 尝试将给出的pod调度到Node。
    	SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
    
    	// 关闭scheduler的信号
    	StopEverything <-chan struct{}
    
    	// SchedulingQueue保存要调度的Pod
    	SchedulingQueue internalqueue.SchedulingQueue
    
    	// Profiles中是多个调度框架
    	Profiles profile.Map
    	client clientset.Interface
    	nodeInfoSnapshot *internalcache.Snapshot
    	percentageOfNodesToScore int32
    	nextStartNodeIndex int
    }
    
    折叠

    作为实际执行的两个核心,SchedulingQueue ,与 scheduleOne 将会分析到这两个

    SchedulingQueue#

    在知道 kube-scheduler 初始化过程后,需要对 kube-scheduler 的整个 structureworkflow 进行分析

    Run 中,运行的是 一个 SchedulingQueue 与 一个 scheduleOne ,从结构上看是属于 Scheduler

    func (sched *Scheduler) Run(ctx context.Context) {
    	sched.SchedulingQueue.Run()
    
    	// We need to start scheduleOne loop in a dedicated goroutine,
    	// because scheduleOne function hangs on getting the next item
    	// from the SchedulingQueue.
    	// If there are no new pods to schedule, it will be hanging there
    	// and if done in this goroutine it will be blocking closing
    	// SchedulingQueue, in effect causing a deadlock on shutdown.
    	go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    
    	<-ctx.Done()
    	sched.SchedulingQueue.Close()
    }
    
    

    SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。

    type SchedulingQueue interface {
    	framework.PodNominator
    	Add(pod *v1.Pod) error
    	// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    	// The passed-in pods are originally compiled from plugins that want to activate Pods,
    	// by injecting the pods through a reserved CycleState struct (PodsToActivate).
    	Activate(pods map[string]*v1.Pod)
    	// 将不可调度的Pod重入到队列中
    	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    	// SchedulingCycle returns the current number of scheduling cycle which is
    	// cached by scheduling queue. Normally, incrementing this number whenever
    	// a pod is popped (e.g. called Pop()) is enough.
    	SchedulingCycle() int64
    	// Pop会弹出一个pod,并从head优先级队列中删除
    	Pop() (*framework.QueuedPodInfo, error)
    	Update(oldPod, newPod *v1.Pod) error
    	Delete(pod *v1.Pod) error
    	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    	AssignedPodAdded(pod *v1.Pod)
    	AssignedPodUpdated(pod *v1.Pod)
    	PendingPods() []*v1.Pod
    	// Close closes the SchedulingQueue so that the goroutine which is
    	// waiting to pop items can exit gracefully.
    	Close()
    	// Run starts the goroutines managing the queue.
    	Run()
    }
    

    PriorityQueueSchedulingQueue 的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQbackoffQunschedulablePods

    • activeQ:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度
    • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
    • unschedulablePods :保存确定不可被调度的Pod
    type SchedulingQueue interface {
    	framework.PodNominator
    	Add(pod *v1.Pod) error
    	// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    	// The passed-in pods are originally compiled from plugins that want to activate Pods,
    	// by injecting the pods through a reserved CycleState struct (PodsToActivate).
    	Activate(pods map[string]*v1.Pod)
    	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    	// The podSchedulingCycle represents the current scheduling cycle number which can be
    	// returned by calling SchedulingCycle().
    	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    	// SchedulingCycle returns the current number of scheduling cycle which is
    	// cached by scheduling queue. Normally, incrementing this number whenever
    	// a pod is popped (e.g. called Pop()) is enough.
    	SchedulingCycle() int64
    	// Pop removes the head of the queue and returns it. It blocks if the
    	// queue is empty and waits until a new item is added to the queue.
    	Pop() (*framework.QueuedPodInfo, error)
    	Update(oldPod, newPod *v1.Pod) error
    	Delete(pod *v1.Pod) error
    	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    	AssignedPodAdded(pod *v1.Pod)
    	AssignedPodUpdated(pod *v1.Pod)
    	PendingPods() []*v1.Pod
    	// Close closes the SchedulingQueue so that the goroutine which is
    	// waiting to pop items can exit gracefully.
    	Close()
    	// Run starts the goroutines managing the queue.
    	Run()
    }
    
    折叠

    在New scheduler 时可以看到会初始化这个queue

    podQueue := internalqueue.NewSchedulingQueue(
        // 实现pod对比的一个函数即less
        profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
        informerFactory,
        internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
        internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
        internalqueue.WithPodNominator(nominator),
        internalqueue.WithClusterEventMap(clusterEventMap),
        internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
    )
    

    NewSchedulingQueue 则是初始化这个 PriorityQueue

    // NewSchedulingQueue initializes a priority queue as a new scheduling queue.
    func NewSchedulingQueue(
    	lessFn framework.LessFunc,
    	informerFactory informers.SharedInformerFactory,
    	opts ...Option) SchedulingQueue {
    	return NewPriorityQueue(lessFn, informerFactory, opts...)
    }
    
    // NewPriorityQueue creates a PriorityQueue object.
    func NewPriorityQueue(
    	lessFn framework.LessFunc,
    	informerFactory informers.SharedInformerFactory,
    	opts ...Option,
    ) *PriorityQueue {
    	options := defaultPriorityQueueOptions
    	for _, opt := range opts {
    		opt(&options)
    	}
    	// 这个就是 less函数,作为打分的一部分
    	comp := func(podInfo1, podInfo2 interface{}) bool {
    		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
    		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
    		return lessFn(pInfo1, pInfo2)
    	}
    
    	if options.podNominator == nil {
    		options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    	}
    
    	pq := &PriorityQueue{
    		PodNominator:                      options.podNominator,
    		clock:                             options.clock,
    		stop:                              make(chan struct{}),
    		podInitialBackoffDuration:         options.podInitialBackoffDuration,
    		podMaxBackoffDuration:             options.podMaxBackoffDuration,
    		podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
    		activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
    		unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
    		moveRequestCycle:                  -1,
    		clusterEventMap:                   options.clusterEventMap,
    	}
    	pq.cond.L = &pq.lock
    	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
    
    	return pq
    }
    
    折叠

    了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs 这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。

    注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈

    Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToSchedulingQueue,
        UpdateFunc: sched.updatePodInSchedulingQueue,
        DeleteFunc: sched.deletePodFromSchedulingQueue,
    },
    
    func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
    	pod := obj.(*v1.Pod)
    	klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
    	if err := sched.SchedulingQueue.Add(pod); err != nil {
    		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
    	}
    }
    

    而这个 SchedulingQueue 的实现就是 PriorityQueue ,而Add中则对 activeQ进行的操作

    func (p *PriorityQueue) Add(pod *v1.Pod) error {
    	p.lock.Lock()
    	defer p.lock.Unlock()
        // 格式化入栈数据,包含podinfo,里会包含v1.Pod
        // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称
    	pInfo := p.newQueuedPodInfo(pod)
        // 入栈
    	if err := p.activeQ.Add(pInfo); err != nil {
    		klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
    		return err
    	}
    	if p.unschedulablePods.get(pod) != nil {
    		klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
    		p.unschedulablePods.delete(pod)
    	}
    	// Delete pod from backoffQ if it is backing off
    	if err := p.podBackoffQ.Delete(pInfo); err == nil {
    		klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
    	}
    	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
    	p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    	p.cond.Broadcast()
    
    	return nil
    }
    

    在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod

    func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    	return func() *framework.QueuedPodInfo {
    		podInfo, err := queue.Pop()
    		if err == nil {
    			klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
    			for plugin := range podInfo.UnschedulablePlugins {
    				metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
    			}
    			return podInfo
    		}
    		klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
    		return nil
    	}
    }
    

    而这个 queue.Pop() 对应的就是 PriorityQueuePop() ,在这里会将作为 activeQ 的消费端

    func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
       p.lock.Lock()
       defer p.lock.Unlock()
       for p.activeQ.Len() == 0 {
          // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
          // When Close() is called, the p.closed is set and the condition is broadcast,
          // which causes this loop to continue and return from the Pop().
          if p.closed {
             return nil, fmt.Errorf(queueClosed)
          }
          p.cond.Wait()
       }
       obj, err := p.activeQ.Pop()
       if err != nil {
          return nil, err
       }
       pInfo := obj.(*framework.QueuedPodInfo)
       pInfo.Attempts++
       p.schedulingCycle++
       return pInfo, nil
    }
    

    在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc ,至此回到对应的 Pop来做消费。

    schedulerOne是为一个Pod做调度的流程。

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	podInfo := sched.NextPod()
    	// pod could be nil when schedulerQueue is closed
    	if podInfo == nil || podInfo.Pod == nil {
    		return
    	}
    	pod := podInfo.Pod
    	fwk, err := sched.frameworkForPod(pod)
    	if err != nil {
    		// This shouldn't happen, because we only accept for scheduling the pods
    		// which specify a scheduler name that matches one of the profiles.
    		klog.ErrorS(err, "Error occurred")
    		return
    	}
    	if sched.skipPodSchedule(fwk, pod) {
    		return
    	}
    ...
    

    调度上下文#

    image

    图1:Pod的调度上下文
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework

    当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。

    调度框架 [2]#

    调度框架 (scheduling framework SF ) 是kubernetes为 scheduler设计的一个pluggable的架构。SF 将scheduler设计为 Plugin 式的 API,API将上一章中提到的一些列调度策略实现为 Plugin

    SF 中,定义了一些扩展点 (extension points EP ),而被实现为Plugin的调度程序将被注册在一个或多个 EP 中,换句话来说,在这些 EP 的执行过程中如果注册在多个 EP 中,将会在多个 EP 被调用。

    每次调度都分为两个阶段,调度周期(Scheduling Cycel)与绑定周期(Binding Cycle)。

    • SC 表示为,为Pod选择一个节点;SC 是串行运行的。
    • BC 表示为,将 SC 决策结果应用于集群中;BC 可以同时运行。

    调度周期与绑定周期结合一起,被称为调度上下文Scheduling Context),下图则是调度上下文的工作流

    注:如果决策结果为Pod的调度结果无可用节点,或存在内部错误,则中止 SCBC。Pod将重入队列重试

    扩展点 [3]#

    扩展点(Extension points)是指在调度上下文中的每个可扩展API,通过图提现为[图1]。其中 Filter 相当于 PredicateScoring 相当于 Priority

    对于调度阶段会通过以下扩展点:

    • Sort:该插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

    • preFilter:该插件用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

    • filter:该插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

    • postFilter:当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤

    • preScore:可用于进行预Score工作(通知性的扩展点)。

    • score:该插件为每个通过 filter 阶段的Node提供打分服务。然后Scheduler将选择具有最高加权分数总和的Node。

    • reserve:因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

    • permit:该插件可以阻止或延迟 Pod 的绑定,一般情况下这步骤会做三件事:

      • appove :调度器继续绑定过程
      • Deny:如果任何一个Premit拒绝了Pod与节点的绑定,那么将触发 UnReserve ,并重入队列
      • Wait: 如果 Permit 插件返回 Wait,该 Pod 将保留在内部 Wait Pod 列表中,直到被 Appove。如果发生超时,wait 变为 deny ,将Pod放回至调度队列中,并触发 Unreserve 回滚 。
    • preBind:该插件用于在 bind Pod 之前执行所需的前置工作。如,preBind 可能会提供一个网络卷并将其挂载到目标节点上。如果在该步骤中的任意插件返回错误,则Pod 将被 deny 并放置到调度队列中。

    • bind:在所有的 preBind 完成后,该插件将用于将Pod绑定到Node,并按顺序调用绑定该步骤的插件。如果有一个插件处理了这个事件,那么则忽略其余所有插件。

    • postBind:该插件在绑定 Pod 后调用,可用于清理相关资源(通知性的扩展点)。

    • multiPoint:这是一个仅配置字段,允许同时为所有适用的扩展点启用或禁用插件。

    scheduler 对于调度上下文在代码中的实现就是 scheduleOne ,下面就是看这个调度上下文

    Sort#

    Sort 插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

    在进入 scheduleOne 后,NextPodactiveQ 中队列中得到一个Pod,然后的 frameworkForPod 会做打分的动作就是调度上下文的第一个扩展点 sort

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	podInfo := sched.NextPod()
    	// pod could be nil when schedulerQueue is closed
    	if podInfo == nil || podInfo.Pod == nil {
    		return
    	}
    	pod := podInfo.Pod
    	fwk, err := sched.frameworkForPod(pod)
    ...
        
    func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
        // 获取指定的profile
    	fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
    	if !ok {
    		return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
    	}
    	return fwk, nil
    }
    

    回顾,因为在New scheduler时会初始化这个 sort 函数

    podQueue := internalqueue.NewSchedulingQueue(
        profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
        informerFactory,
        internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
        internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
        internalqueue.WithPodNominator(nominator),
        internalqueue.WithClusterEventMap(clusterEventMap),
        internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
    )
    

    preFilter#

    preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	podInfo := sched.NextPod()
    	// pod could be nil when schedulerQueue is closed
    	if podInfo == nil || podInfo.Pod == nil {
    		return
    	}
    	pod := podInfo.Pod
    	fwk, err := sched.frameworkForPod(pod)
    	if err != nil {
    		// This shouldn't happen, because we only accept for scheduling the pods
    		// which specify a scheduler name that matches one of the profiles.
    		klog.ErrorS(err, "Error occurred")
    		return
    	}
    	if sched.skipPodSchedule(fwk, pod) {
    		return
    	}
    
    	klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
    
    	// Synchronously attempt to find a fit for the pod.
    	start := time.Now()
    	state := framework.NewCycleState()
    	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    	// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
    	podsToActivate := framework.NewPodsToActivate()
    	state.Write(framework.PodsToActivateKey, podsToActivate)
    
    	schedulingCycleCtx, cancel := context.WithCancel(ctx)
    	defer cancel()
        // 这里将进入prefilter
    	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    
    折叠

    schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。

    func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    	defer trace.LogIfLong(100 * time.Millisecond)
    	// 用于将cache更新为当前内容
    	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
    		return result, err
    	}
    	trace.Step("Snapshotting scheduler cache and node infos done")
    
    	if sched.nodeInfoSnapshot.NumNodes() == 0 {
    		return result, ErrNoNodesAvailable
    	}
    	// 找到一个合适的pod时,会执行扩展点
    	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    	
        ...
    

    findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件😁😁,后面会分析算法内容,只对workflow学习。

    func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    	diagnosis := framework.Diagnosis{
    		NodeToStatusMap:      make(framework.NodeToStatusMap),
    		UnschedulablePlugins: sets.NewString(),
    	}
    
    	// Run "prefilter" plugins.
    	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
    	allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
    	if err != nil {
    		return nil, diagnosis, err
    	}
    	if !s.IsSuccess() {
    		if !s.IsUnschedulable() {
    			return nil, diagnosis, s.AsError()
    		}
    		// All nodes will have the same status. Some non trivial refactoring is
    		// needed to avoid this copy.
    		for _, n := range allNodes {
    			diagnosis.NodeToStatusMap[n.Node().Name] = s
    		}
    		// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
    		if s.FailedPlugin() != "" {
    			diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
    		}
    		return nil, diagnosis, nil
    	}
    
    	// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
    	// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
    	if len(pod.Status.NominatedNodeName) > 0 {
    		feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
    		if err != nil {
    			klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
    		}
    		// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
    		if len(feasibleNodes) != 0 {
    			return feasibleNodes, diagnosis, nil
    		}
    	}
    
    	nodes := allNodes
    	if !preRes.AllNodes() {
    		nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
    		for n := range preRes.NodeNames {
    			nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
    			if err != nil {
    				return nil, diagnosis, err
    			}
    			nodes = append(nodes, nInfo)
    		}
    	}
    	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    	if err != nil {
    		return nil, diagnosis, err
    	}
    
    	feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    	if err != nil {
    		return nil, diagnosis, err
    	}
    	return feasibleNodes, diagnosis, nil
    }
    
    折叠

    filter#

    filter插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

    对于代码中来讲,filter还是处于 findNodesThatFitPod 函数中,findNodesThatPassFilters 就是获取到 FN,即可行节点,而这个过程就是 filter 扩展点

    func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    	...
        
    	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    	if err != nil {
    		return nil, diagnosis, err
    	}
    
    	feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    	if err != nil {
    		return nil, diagnosis, err
    	}
    	return feasibleNodes, diagnosis, nil
    }
    

    Postfilter#

    当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤,那我们来验证下这里把😊

    还是在 scheduleOne 中,当我们运行的 SchedulePod 完成后(成功或失败),这时会返回一个err,而 postfilter 会根据这个 err进行选择执行或不执行,符合官方给出的说法。

    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    	if err != nil {
    		// SchedulePod() may have failed because the pod would not fit on any host, so we try to
    		// preempt, with the expectation that the next time the pod is tried for scheduling it
    		// will fit due to the preemption. It is also possible that a different pod will schedule
    		// into the resources that were preempted, but this is harmless.
    		var nominatingInfo *framework.NominatingInfo
    		if fitError, ok := err.(*framework.FitError); ok {
    			if !fwk.HasPostFilterPlugins() {
    				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
    			} else {
    				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
    				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
    				if status.Code() == framework.Error {
    					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				} else {
    					fitError.Diagnosis.PostFilterMsg = status.Message()
    					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				}
    				if result != nil {
    					nominatingInfo = result.NominatingInfo
    				}
    			}
    			// Pod did not fit anywhere, so it is counted as a failure. If preemption
    			// succeeds, the pod should get counted as a success the next time we try to
    			// schedule it. (hopefully)
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		} else if err == ErrNoNodesAvailable {
    			nominatingInfo = clearNominatedNode
    			// No nodes available is counted as unschedulable rather than an error.
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		} else {
    			nominatingInfo = clearNominatedNode
    			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		}
    		sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
    		return
    	}
    
    折叠

    PreScore,Score#

    可用于进行预Score工作,作为通知性的扩展点,会在在filter完之后直接会关联 preScore 插件进行继续工作,而不是返回,如果配置的这些插件有任何一个返回失败,则Pod将被拒绝。

    
    func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    	defer trace.LogIfLong(100 * time.Millisecond)
    
    	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
    		return result, err
    	}
    	trace.Step("Snapshotting scheduler cache and node infos done")
    
    	if sched.nodeInfoSnapshot.NumNodes() == 0 {
    		return result, ErrNoNodesAvailable
    	}
    
    	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    	if err != nil {
    		return result, err
    	}
    	trace.Step("Computing predicates done")
    
    	if len(feasibleNodes) == 0 {
    		return result, &framework.FitError{
    			Pod:         pod,
    			NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
    			Diagnosis:   diagnosis,
    		}
    	}
    
    	// When only one node after predicate, just use it.
    	if len(feasibleNodes) == 1 {
    		return ScheduleResult{
    			SuggestedHost:  feasibleNodes[0].Name,
    			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
    			FeasibleNodes:  1,
    		}, nil
    	}
    	// 这里会完成prescore,score
    	priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
    	if err != nil {
    		return result, err
    	}
    
    	host, err := selectHost(priorityList)
    	trace.Step("Prioritizing done")
    
    	return ScheduleResult{
    		SuggestedHost:  host,
    		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
    		FeasibleNodes:  len(feasibleNodes),
    	}, err
    }
    
    折叠

    priorityNodes 会通过配置的插件给Node打分,并返回每个Node的分数,将每个插件打分结果计算总和获得Node的分数,最后获得节点的加权总分数。

    func prioritizeNodes(
    	ctx context.Context,
    	extenders []framework.Extender,
    	fwk framework.Framework,
    	state *framework.CycleState,
    	pod *v1.Pod,
    	nodes []*v1.Node,
    ) (framework.NodeScoreList, error) {
    	// If no priority configs are provided, then all nodes will have a score of one.
    	// This is required to generate the priority list in the required format
    	if len(extenders) == 0 && !fwk.HasScorePlugins() {
    		result := make(framework.NodeScoreList, 0, len(nodes))
    		for i := range nodes {
    			result = append(result, framework.NodeScore{
    				Name:  nodes[i].Name,
    				Score: 1,
    			})
    		}
    		return result, nil
    	}
    
    	// Run PreScore plugins.
    	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    	if !preScoreStatus.IsSuccess() {
    		return nil, preScoreStatus.AsError()
    	}
    
    	// Run the Score plugins.
    	scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    	if !scoreStatus.IsSuccess() {
    		return nil, scoreStatus.AsError()
    	}
    
    	// Additional details logged at level 10 if enabled.
    	klogV := klog.V(10)
    	if klogV.Enabled() {
    		for plugin, nodeScoreList := range scoresMap {
    			for _, nodeScore := range nodeScoreList {
    				klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
    			}
    		}
    	}
    
    	// Summarize all scores.
    	result := make(framework.NodeScoreList, 0, len(nodes))
    
    	for i := range nodes {
    		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
    		for j := range scoresMap {
    			result[i].Score += scoresMap[j][i].Score
    		}
    	}
    
    	if len(extenders) != 0 && nodes != nil {
    		var mu sync.Mutex
    		var wg sync.WaitGroup
    		combinedScores := make(map[string]int64, len(nodes))
    		for i := range extenders {
    			if !extenders[i].IsInterested(pod) {
    				continue
    			}
    			wg.Add(1)
    			go func(extIndex int) {
    				metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
    				defer func() {
    					metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
    					wg.Done()
    				}()
    				prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
    				if err != nil {
    					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
    					klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
    					return
    				}
    				mu.Lock()
    				for i := range *prioritizedList {
    					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
    					if klogV.Enabled() {
    						klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
    					}
    					combinedScores[host] += score * weight
    				}
    				mu.Unlock()
    			}(i)
    		}
    		// wait for all go routines to finish
    		wg.Wait()
    		for i := range result {
    			// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
    			// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
    			result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
    		}
    	}
    
    	if klogV.Enabled() {
    		for i := range result {
    			klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
    		}
    	}
    	return result, nil
    }
    
    折叠

    Reserve#

    Reserve 因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

    // Run the Reserve method of reserve plugins.
    if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        // 触发 un-reserve 来清理相关Pod的状态
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
        return
    }
    

    permit#

    Permit 插件可以阻止或延迟 Pod 的绑定

    	// Run "permit" plugins.
    	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    	if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
    		var reason string
    		if runPermitStatus.IsUnschedulable() {
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			reason = v1.PodReasonUnschedulable
    		} else {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			reason = SchedulerError
    		}
            // 只要其中一个插件返回的状态不是 success 或者 wait
    		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            // 从cache中忘掉pod
    		if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
    			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    		}
    		sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
    		return
    	}
    
    

    Binding Cycle#

    在选择好 FN 后则做一个假设绑定,并更新到cache中,接下来回去执行真正的bind操作,也就是 binding cycle

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	...
        ...
    	// binding cycle 是一个异步的操作,这里表现就是go协程
    	go func() {
    		bindingCycleCtx, cancel := context.WithCancel(ctx)
    		defer cancel()
    		metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
    		defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
    		// 运行WaitOnPermit插件,如果失败则,unReserve回滚
    		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
    		if !waitOnPermitStatus.IsSuccess() {
    			var reason string
    			if waitOnPermitStatus.IsUnschedulable() {
    				metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    				reason = v1.PodReasonUnschedulable
    			} else {
    				metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    				reason = SchedulerError
    			}
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
    				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    			} else {
    				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
    				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
    				// TODO(#103853): de-duplicate the logic.
    				// Avoid moving the assumed Pod itself as it's always Unschedulable.
    				// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
    				// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
    				defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
    					return assumedPod.UID != pod.UID
    				})
    			}
    			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
    			return
    		}
    
    	// 运行Prebind 插件
    		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		if !preBindStatus.IsSuccess() {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
    				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    			} else {
    				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
    				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
    				// TODO(#103853): de-duplicate the logic.
    				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
    			}
    			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
    			return
    		}
    		// bind是真正的绑定操作
    		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
    		if err != nil {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			// 如果失败了就触发 un-reserve plugins 
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			if err := sched.Cache.ForgetPod(assumedPod); err != nil {
    				klog.ErrorS(err, "scheduler cache ForgetPod failed")
    			} else {
    				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
    				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
    				// TODO(#103853): de-duplicate the logic.
    				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
    			}
    			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
    			return
    		}
    		// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
    		klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
    		metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
    		metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
    
    		// 运行 "postbind" 插件
            // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。
    		fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    
    		// At the end of a successful binding cycle, move up Pods if needed.
    		if len(podsToActivate.Map) != 0 {
    			sched.SchedulingQueue.Activate(podsToActivate.Map)
    			// Unlike the logic in scheduling cycle, we don't bother deleting the entries
    			// as `podsToActivate.Map` is no longer consumed.
    		}
    	}()
    }
    
    折叠

    调度上下文中的失败流程#

    上面说到的都是正常的请求,下面会对失败的请求是如何重试的进行分析,而 scheduler 中关于失败处理方面相关的属性会涉及到上面 scheduler 结构中的 backoffQunschedulablePods

    • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
    • unschedulablePods :保存确定不可被调度的Pod,一个map类型

    backoffQ 与 unschedulablePods 会在初始化 scheduler 时初始化,

    func NewPriorityQueue(
    	lessFn framework.LessFunc,
    	informerFactory informers.SharedInformerFactory,
    	opts ...Option,
    ) *PriorityQueue {
    	options := defaultPriorityQueueOptions
    	for _, opt := range opts {
    		opt(&options)
    	}
    
    	comp := func(podInfo1, podInfo2 interface{}) bool {
    		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
    		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
    		return lessFn(pInfo1, pInfo2)
    	}
    
    	if options.podNominator == nil {
    		options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    	}
    
    	pq := &PriorityQueue{
    		PodNominator:                      options.podNominator,
    		clock:                             options.clock,
    		stop:                              make(chan struct{}),
    		podInitialBackoffDuration:         options.podInitialBackoffDuration,
    		podMaxBackoffDuration:             options.podMaxBackoffDuration,
    		podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
    		activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
    		unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
    		moveRequestCycle:                  -1,
    		clusterEventMap:                   options.clusterEventMap,
    	}
    	pq.cond.L = &pq.lock
        // 初始化backoffQ
        // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。
        // podInfoKeyFunc是一个函数,返回错误与字符串
        // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true,
        // 反之 false
    	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
    
    	return pq
    }
    
    折叠

    对于初始化 backoffQ 会产生的两个函数,getBackoffTimecalculateBackoffDuration

    // getBackoffTime returns the time that podInfo completes backoff
    func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
    	duration := p.calculateBackoffDuration(podInfo)
    	backoffTime := podInfo.Timestamp.Add(duration)
    	return backoffTime
    }
    
    // calculateBackoffDuration is a helper function for calculating the backoffDuration
    // based on the number of attempts the pod has made.
    func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
    	duration := p.podInitialBackoffDuration
    	for i := 1; i < podInfo.Attempts; i++ {
    		// Use subtraction instead of addition or multiplication to avoid overflow.
    		if duration > p.podMaxBackoffDuration-duration {
    			return p.podMaxBackoffDuration
    		}
    		duration += duration
    	}
    	return duration
    }
    

    对于整个故障错误会按照如下流程进行,在初始化 scheduler 会注册一个 Error 函数,这个函数用作对不可调度Pod进行处理,实际上被注册的函数是 MakeDefaultErrorFunc。这个函数将作为 Error 函数被调用。

    sched := newScheduler(
        schedulerCache,
        extenders,
        internalqueue.MakeNextPodFunc(podQueue),
        MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
        stopEverything,
        podQueue,
        profiles,
        client,
        snapshot,
        options.percentageOfNodesToScore,
    )
    

    而在 调度周期中,也就是 scheduleOne 可以看到,每个扩展点操作失败后都会调用 handleSchedulingFailure 而该函数,使用了注册的 Error 函数来处理Pod

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	...
    	defer cancel()
    	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    	if err != nil {
    
    		var nominatingInfo *framework.NominatingInfo
    		if fitError, ok := err.(*framework.FitError); ok {
    			if !fwk.HasPostFilterPlugins() {
    				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
    			} else {
    			
    				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
    				if status.Code() == framework.Error {
    					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				} else {
    					fitError.Diagnosis.PostFilterMsg = status.Message()
    					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				}
    				if result != nil {
    					nominatingInfo = result.NominatingInfo
    				}
    			}
    	
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		} else if err == ErrNoNodesAvailable {
    			nominatingInfo = clearNominatedNode
    			// No nodes available is counted as unschedulable rather than an error.
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		} else {
    			nominatingInfo = clearNominatedNode
    			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		}
            // 处理不可调度Pod
    		sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
    		return
    	}
    
    
    折叠

    来到了注册的 Error 函数 MakeDefaultErrorFunc

    func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    	return func(podInfo *framework.QueuedPodInfo, err error) {
    		pod := podInfo.Pod
    		if err == ErrNoNodesAvailable {
    			klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
    		} else if fitError, ok := err.(*framework.FitError); ok {
    			// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
    			podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
    			klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
    		} else if apierrors.IsNotFound(err) {
    			klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
    			if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
    				nodeName := errStatus.Status().Details.Name
    				// when node is not found, We do not remove the node right away. Trying again to get
    				// the node and if the node is still not found, then remove it from the scheduler cache.
    				_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
    				if err != nil && apierrors.IsNotFound(err) {
    					node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
    					if err := schedulerCache.RemoveNode(&node); err != nil {
    						klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
    					}
    				}
    			}
    		} else {
    			klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
    		}
    
    		// Check if the Pod exists in informer cache.
    		cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
    		if err != nil {
    			klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
    			return
    		}
    
    		// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
    		// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
    		if len(cachedPod.Spec.NodeName) != 0 {
    			klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
    			return
    		}
    
    		// As  is from SharedInformer, we need to do a DeepCopy() here.
    		podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
            // 添加到unschedulable队列中
    		if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
    			klog.ErrorS(err, "Error occurred")
    		}
    	}
    }
    
    折叠

    下面来到 AddUnschedulableIfNotPresent ,这个也是操作 backoffQunschedulablePods 的真正的动作

    AddUnschedulableIfNotPresent 函数会吧无法调度的 pod 插入队列,除非它已经在队列中。通常情况下,PriorityQueue 将不可调度的 Pod 放在 unschedulablePods 中。但如果最近有 move request,则将 pod 放入 podBackoffQ 中。

    func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    	p.lock.Lock()
    	defer p.lock.Unlock()
    	pod := pInfo.Pod
        // 如果已经存在则不添加
    	if p.unschedulablePods.get(pod) != nil {
    		return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
    	}
    	// 检查是否在activeQ中
    	if _, exists, _ := p.activeQ.Get(pInfo); exists {
    		return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
    	}
        // 检查是否在podBackoffQ中
    	if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
    		return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
    	}
    
    	// 在重新添加时,会刷新 Pod时间为最新操作的时间
    	pInfo.Timestamp = p.clock.Now()
    
    	for plugin := range pInfo.UnschedulablePlugins {
    		metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
    	}
        // 如果接受到move request那么则放入BackoffQ
    	if p.moveRequestCycle >= podSchedulingCycle {
    		if err := p.podBackoffQ.Add(pInfo); err != nil {
    			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
    		}
    		metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
    	} else {
            // 否则将放入到 unschedulablePods
    		p.unschedulablePods.addOrUpdate(pInfo)
    		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
    
    	}
    
    	p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    	return nil
    }
    
    折叠

    在启动 scheduler 时,会将这两个队列异步启用两个loop来操作队列。表现在 Run()

    func (p *PriorityQueue) Run() {
    	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    	go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
    }
    

    可以看到 flushBackoffQCompleted 作为 BackoffQ 实现;而 flushUnschedulablePodsLeftover 作为 UnschedulablePods 实现。

    flushBackoffQCompleted 是用于将所有已完成回退的 pod 从 backoffQ 移到 activeQ

    func (p *PriorityQueue) flushBackoffQCompleted() {
    	p.lock.Lock()
    	defer p.lock.Unlock()
    	broadcast := false
    	for { // 这就是heap实现的方法,窥视下,但不弹出
    		rawPodInfo := p.podBackoffQ.Peek()
    		if rawPodInfo == nil {
    			break
    		}
    		pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
    		boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
    		if boTime.After(p.clock.Now()) {
    			break
    		}
    		_, err := p.podBackoffQ.Pop() // 弹出一个
    		if err != nil {
    			klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
    			break
    		}
    		p.activeQ.Add(rawPodInfo) // 放入到活动队列中
    		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
    		broadcast = true
    	}
    
    	if broadcast {
    		p.cond.Broadcast()
    	}
    }
    
    折叠

    flushUnschedulablePodsLeftover 函数用于将在 unschedulablePods 中的存放时间超过 podMaxInUnschedulablePodsDuration 值的 pod 移动到 backoffQactiveQ 中。

    podMaxInUnschedulablePodsDuration 会根据配置传入,当没有传入,也就是使用了 Deprecated 那么会为5分钟。

    func NewOptions() *Options {
    	o := &Options{
    		SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(),
    		Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
    		Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
    		Deprecated: &DeprecatedOptions{
    			PodMaxInUnschedulablePodsDuration: 5 * time.Minute,
    		},
    

    对于 flushUnschedulablePodsLeftover 就是做一个时间对比,然后添加到对应的队列中

    func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
    	p.lock.Lock()
    	defer p.lock.Unlock()
    
    	var podsToMove []*framework.QueuedPodInfo
    	currentTime := p.clock.Now()
    	for _, pInfo := range p.unschedulablePods.podInfoMap {
    		lastScheduleTime := pInfo.Timestamp
    		if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
    			podsToMove = append(podsToMove, pInfo)
    		}
    	}
    
    	if len(podsToMove) > 0 {
    		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    	}
    }
    

    总结调度上下文流程#

    • 在构建一个 scheduler 时经历如下步骤:
      • 准备cache,informer,queue,错误处理函数等
      • 添加事件函数,会监听资源(如Pod),当有变动则触发对应事件函数,这是入站 activeQ
    • 构建完成后会 run,run时会run一个 SchedulingQueue,这个是作为不可调度队列
      • BackoffQ
      • UnschedulablePods
      • 不可调度队列会根据注册时定期消费队列中Pod将其添加到 activeQ
    • 启动一个 scheduleOne 的loop,这个是调度上下文中所有的扩展点的执行,也是 activeQ 的消费端
      • scheduleOne 获取 pod
      • 执行各个扩展点,如果出错则 Error 函数 MakeDefaultErrorFunc 将其添加到不可调度队列中
      • 回到不可调度队列中消费部分

    Reference

    [1] kubernetes scheduler extender
    [2] scheduling framework
    [3] Extension points

  • 相关阅读:
    Apache软件基金会的孵化标准和毕业标准
    RHEL7.9使用CentOS源
    Promise学习
    自学数据库-redis
    鼠标维修笔记
    微服务框架 SpringCloud微服务架构 5 Nacos 5.6 环境隔离
    Verilog参数定义与仿真模块中的参数修改
    linux部署校园网绕过53端口服务脚本
    AI绘画使用Stable Diffusion(SDXL)绘制玉雕风格的龙
    基于springboot+vue的员工绩效考核与激励系统
  • 原文地址:https://www.cnblogs.com/Cylon/p/16504077.html