• kube-scheduler 调度流程


    创建调度器

    创建调度器主要是构造调度队列,调度缓存,调度框架和调度算法,以及获取待调度pod和将调度失败pod加入不可调度队列的两个函数。

    //pkg/scheduler/factory.go
    func (c *Configurator) create() (*Scheduler, error) {
    	...
    	return &Scheduler{
    		//调度缓存
    		SchedulerCache:  c.schedulerCache,
    		//调度算法
    		Algorithm:       algo,
    		Extenders:       extenders,
    		//调度框架
    		Profiles:        profiles,
    		//从调度队列获取得调度pod,如果没有则堵塞
    		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
    		//调度失败后,将pod加入不可调度队列
    		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
    		StopEverything:  c.StopEverything,
    		//调度队列
    		SchedulingQueue: podQueue,
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    调度缓存,调度队列和调度框架在前面文章已经分析过了,还有一个核心功能调度算法,实现也比较简单,只有一个接口函数,如下

    type ScheduleAlgorithm interface {
    	Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
    }
    
    • 1
    • 2
    • 3

    genericScheduler实现了ScheduleAlgorithm接口,其结构如下

    type genericScheduler struct {
    	//用来调用cache提供的UpdateSnapshot做快照
    	cache                    internalcache.Cache
    	//将快照信息保存到nodeInfoSnapshot
    	nodeInfoSnapshot         *internalcache.Snapshot
    	//每次调度pod时,并不是所有的node都会参与调度,而是通过此变量控制百分百
    	percentageOfNodesToScore int32
    	//如上面所说,用来保存下一次调度时从哪个node开始调度
    	nextStartNodeIndex       int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    MakeNextPodFunc返回一个函数,用来从调度队列pop待调度pod,如果没有则堵塞

    // MakeNextPodFunc returns a function to retrieve the next pod from a given
    // scheduling queue
    func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    	return func() *framework.QueuedPodInfo {
    		podInfo, err := queue.Pop()
    		if err == nil {
    			klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
    			return podInfo
    		}
    		klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
    		return nil
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    MakeDefaultErrorFunc返回一个函数,用来将调度失败的pod加入不可调度队列等待重新调度

    // MakeDefaultErrorFunc construct a function to handle pod scheduler error
    func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    	return func(podInfo *framework.QueuedPodInfo, err error) {
    		pod := podInfo.Pod
    		if err == ErrNoNodesAvailable {
    			klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
    		} else if fitError, ok := err.(*framework.FitError); ok {
    			// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
    			podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
    			klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
    		} else if apierrors.IsNotFound(err) {
    			klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
    			if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
    				nodeName := errStatus.Status().Details.Name
    				// when node is not found, We do not remove the node right away. Trying again to get
    				// the node and if the node is still not found, then remove it from the scheduler cache.
    				_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
    				if err != nil && apierrors.IsNotFound(err) {
    					node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
    					if err := schedulerCache.RemoveNode(&node); err != nil {
    						klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
    					}
    				}
    			}
    		} else {
    			klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
    		}
    
    		//如果pod已经被删除了,也就没必要加入不可调度队列了
    		// Check if the Pod exists in informer cache.
    		cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
    		if err != nil {
    			klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
    			return
    		}
    
    		// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
    		// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
    		if len(cachedPod.Spec.NodeName) != 0 {
    			klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
    			return
    		}
    
    		// As  is from SharedInformer, we need to do a DeepCopy() here.
    		podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
    		//将pod加入不可调度队列
    		if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
    			klog.ErrorS(err, "Error occurred")
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    运行调度器

    创建调度器后,执行其Run函数开始进行调度

    // Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
    func (sched *Scheduler) Run(ctx context.Context) {
    	sched.SchedulingQueue.Run()
    	//周期调用scheduleOne调度pod
    	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    	sched.SchedulingQueue.Close()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    scheduleOne从待调度队列获取pod,对其执行调度算法Schedule,选择合适的node进行bind,如果调度失败还要执行抢占流程,并加入不可调度队列重新调度

    // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    	//NextPod()为MakeNextPodFunc返回的函数,用来从调度队列pop出一个待调度pod,如果没有待调度pod,则堵塞等待
    	podInfo := sched.NextPod()
    	// pod could be nil when schedulerQueue is closed
    	if podInfo == nil || podInfo.Pod == nil {
    		return
    	}
    	pod := podInfo.Pod
    	fwk, err := sched.frameworkForPod(pod)
    	if err != nil {
    		// This shouldn't happen, because we only accept for scheduling the pods
    		// which specify a scheduler name that matches one of the profiles.
    		klog.ErrorS(err, "Error occurred")
    		return
    	}
    	//如下两种特殊情况可不必调度
    	//a. pod的DeletionTimestamp自动不为空,即pod被删除了
    	//b. pod处于假定状态
    	if sched.skipPodSchedule(fwk, pod) {
    		return
    	}
    
    	klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
    
    	// Synchronously attempt to find a fit for the pod.
    	start := time.Now()
    	//创建CycleState
    	state := framework.NewCycleState()
    	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    	// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
    	podsToActivate := framework.NewPodsToActivate()
    	state.Write(framework.PodsToActivateKey, podsToActivate)
    
    	schedulingCycleCtx, cancel := context.WithCancel(ctx)
    	defer cancel()
    	//执行调度算法,为pod选择一个合适的node,后面具体分析此函数
    	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
    	if err != nil {
    		// Schedule() may have failed because the pod would not fit on any host, so we try to
    		// preempt, with the expectation that the next time the pod is tried for scheduling it
    		// will fit due to the preemption. It is also possible that a different pod will schedule
    		// into the resources that were preempted, but this is harmless.
    		nominatedNode := ""
    		//调度失败,并且失败原因为fitError,这种错误的话执行抢占有可能成功
    		if fitError, ok := err.(*framework.FitError); ok {
    			//没注册PostFilter插件
    			if !fwk.HasPostFilterPlugins() {
    				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
    			} else {
    				//执行PostFilter扩展点上的插件,目前只有抢占一个插件DefaultPreemption:pkg/scheduler/framework/plugins/defaultpreemption/defaultpreemption.go
    				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
    				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
    				if status.Code() == framework.Error {
    					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				} else {
    					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
    				}
    				//抢占成功,NominatedNodeName为提名node,即此pod很大可能会运行在此node上,为什么是可能?虽然抢占成功了但也有可能被其他高优先级的pod抢占
    				if status.IsSuccess() && result != nil {
    					nominatedNode = result.NominatedNodeName
    				}
    			}
    			// Pod did not fit anywhere, so it is counted as a failure. If preemption
    			// succeeds, the pod should get counted as a success the next time we try to
    			// schedule it. (hopefully)
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		//没有可用的node,抢占也没用
    		} else if err == ErrNoNodesAvailable {
    			// No nodes available is counted as unschedulable rather than an error.
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		} else {
    			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		}
    		//对调度失败pod的处理,后面会详细看一下此函数
    		sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
    		//返回,开始调度下一个pod
    		return
    	}
    	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
    	//调度pod成功,告诉cache假定此pod运行在给定的node上,虽然还没有bind到node上,但这样我们继续调度下一个pod而不用等待bind成功
    	assumedPodInfo := podInfo.DeepCopy()
    	assumedPod := assumedPodInfo.Pod
    	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    	//假定pod
    	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    	if err != nil {
    		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		// This is most probably result of a BUG in retrying logic.
    		// We report an error here so that pod scheduling can be retried.
    		// This relies on the fact that Error will check if the pod has been bound
    		// to a node and if so will not add it back to the unscheduled pods queue
    		// (otherwise this would cause an infinite loop).
    		sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
    		return
    	}
    
    	//执行Reserve扩展点上的插件,比如保存PV
    	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
    		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    		// trigger un-reserve to clean up state associated with the reserved Pod
    		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
    			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    		}
    		sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
    		return
    	}
    
    	//执行Permit扩展点上的插件,目前没插件
    	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    	if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
    		var reason string
    		if runPermitStatus.IsUnschedulable() {
    			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			reason = v1.PodReasonUnschedulable
    		} else {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			reason = SchedulerError
    		}
    		// One of the plugins returned status different than success or wait.
    		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
    			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    		}
    		sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
    		return
    	}
    
    	// At the end of a successful scheduling cycle, pop and move up Pods if needed.
    	if len(podsToActivate.Map) != 0 {
    		sched.SchedulingQueue.Activate(podsToActivate.Map)
    		// Clear the entries after activation.
    		podsToActivate.Map = make(map[string]*v1.Pod)
    	}
    
    	//执行异步bind流程
    	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    	go func() {
    		bindingCycleCtx, cancel := context.WithCancel(ctx)
    		defer cancel()
    		metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
    		defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
    
    		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
    		if !waitOnPermitStatus.IsSuccess() {
    			...
    		}
    
    		// Run "prebind" plugins.
    		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		if !preBindStatus.IsSuccess() {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
    				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    			} else {
    				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
    				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
    				// TODO(#103853): de-duplicate the logic.
    				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
    			}
    			sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
    			return
    		}
    
    		//执行bind插件
    		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
    		if err != nil {
    			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
    				klog.ErrorS(err, "scheduler cache ForgetPod failed")
    			} else {
    				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
    				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
    				// TODO(#103853): de-duplicate the logic.
    				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
    			}
    			sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
    		} else {
    			// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
    			if klog.V(2).Enabled() {
    				klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
    			}
    			metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
    			metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
    			metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
    
    			// Run "postbind" plugins.
    			fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    
    			// At the end of a successful binding cycle, move up Pods if needed.
    			if len(podsToActivate.Map) != 0 {
    				sched.SchedulingQueue.Activate(podsToActivate.Map)
    				// Unlike the logic in scheduling cycle, we don't bother deleting the entries
    				// as `podsToActivate.Map` is no longer consumed.
    			}
    		}
    	}()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204

    CycleState
    CycleState用在调度周期中,被插件用来保存/获取任意数据,比如PreFilter插件保存某些数据到CycleState中,Filter从CycleState获取这些数据。

    type CycleState struct {
    	mx      sync.RWMutex
    	storage map[StateKey]StateData
    	// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
    	recordPluginMetrics bool
    }
    
    //接口类型,可用来存储任意数据
    type StateData interface {
    	// Clone is an interface to make a copy of StateData. For performance reasons,
    	// clone should make shallow copies for members (e.g., slices or maps) that are not
    	// impacted by PreFilter's optional AddPod/RemovePod methods.
    	Clone() StateData
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    recordSchedulingFailure
    recordSchedulingFailure用来将失败的pod加入不可调度队列,等待下次重新调度,如果是抢占成功的pod,还要更新调度队列,并且向apiserver发起请求,更新pod.Status.NominatedNodeName为nominatedNode

    // recordSchedulingFailure records an event for the pod that indicates the
    // pod has failed to schedule. Also, update the pod condition and nominated node name if set.
    func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
    	//Error为MakeDefaultErrorFunc返回的函数,将pod放入不可调度队列,等待下次重新调度
    	sched.Error(podInfo, err)
    
    	// Update the scheduling queue with the nominated pod information. Without
    	// this, there would be a race condition between the next scheduling cycle
    	// and the time the scheduler receives a Pod Update for the nominated pod.
    	// Here we check for nil only for tests.
    	if sched.SchedulingQueue != nil {
    		//如果nominatedNode不为空,说明抢占成功,则将此pod添加到提名node nominatedNode的nominatedpod中
    		sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)
    	}
    
    	pod := podInfo.Pod
    	msg := truncateMessage(err.Error())
    	fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
    	//如果nominatedNode不为空,即抢占成功,则设置pod.Status.NominatedNodeName为nominatedNode,向apiserver发起请求更新此字段
    	if err := updatePod(sched.client, pod, &v1.PodCondition{
    		Type:    v1.PodScheduled,
    		Status:  v1.ConditionFalse,
    		Reason:  reason,
    		Message: err.Error(),
    	}, nominatedNode); err != nil {
    		klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    assume
    assume将假定pod加入cache中,并将pod的信息聚合到node上。
    同时如果此pod是抢占pod,还要将抢占信息从cache中删除

    // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
    // assume modifies `assumed`.
    func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    	// Optimistically assume that the binding will succeed and send it to apiserver
    	// in the background.
    	// If the binding fails, scheduler will release resources allocated to assumed pod
    	// immediately.
    	assumed.Spec.NodeName = host
    
    	if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
    		klog.ErrorS(err, "Scheduler cache AssumePod failed")
    		return err
    	}
    	// if "assumed" is a nominated pod, we should remove it from internal cache
    	if sched.SchedulingQueue != nil {
    		sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Schedule
    Schedule从cache获取最新的node信息,调度pod

    // Schedule tries to schedule the given pod to one of the nodes in the node list.
    // If it succeeds, it will return the name of the node.
    // If it fails, it will return a FitError error with reasons.
    func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    	defer trace.LogIfLong(100 * time.Millisecond)
    
    	//g.cache.UpdateSnapshot(g.nodeInfoSnapshot)对cache中的node信息做快照
    	if err := g.snapshot(); err != nil {
    		return result, err
    	}
    	trace.Step("Snapshotting scheduler cache and node infos done")
    
    	//如果node个数为0,则调度失败,返回错误ErrNoNodesAvailable
    	if g.nodeInfoSnapshot.NumNodes() == 0 {
    		return result, ErrNoNodesAvailable
    	}
    
    	//获取通过Filter扩展点上插件的node列表
    	feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
    	if err != nil {
    		return result, err
    	}
    	trace.Step("Computing predicates done")
    
    	//全部过滤失败,返回FitError错误,可执行抢占
    	if len(feasibleNodes) == 0 {
    		return result, &framework.FitError{
    			Pod:         pod,
    			NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
    			Diagnosis:   diagnosis,
    		}
    	}
    
    	//只有一个node通过filter,返回即可
    	if len(feasibleNodes) == 1 {
    		return ScheduleResult{
    			SuggestedHost:  feasibleNodes[0].Name,
    			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
    			FeasibleNodes:  1,
    		}, nil
    	}
    
    	//如果有多个node通过了filter,则执行优选,调用prioritizeNodes对他们执行Score插件进行打分
    	priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
    	if err != nil {
    		return result, err
    	}
    	//分数最高的node即为fitnode
    	host, err := g.selectHost(priorityList)
    	trace.Step("Prioritizing done")
    
    	return ScheduleResult{
    		SuggestedHost:  host,
    		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
    		FeasibleNodes:  len(feasibleNodes),
    	}, err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    findNodesThatFitPod返回适合pod的node列表

    func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    	diagnosis := framework.Diagnosis{
    		NodeToStatusMap:      make(framework.NodeToStatusMap),
    		UnschedulablePlugins: sets.NewString(),
    	}
    
    	//执行PreFilter扩展点上的插件
    	s := fwk.RunPreFilterPlugins(ctx, state, pod)
    	//从快照中获取所有的node
    	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
    	...
    	//如果pod.Status.NominatedNodeName不为空,说明此pod之前被调度失败并执行抢占成功的pod,再次对NominatedNodeName执行Filter插件,
    	//如果key成功,则不用再进行后续流程,直接返回NominatedNodeName即可
    	if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
    		//evaluateNominatedNode调用findNodesThatPassFilters看是否能通过filter
    		feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
    		if err != nil {
    			klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
    		}
    		// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
    		if len(feasibleNodes) != 0 {
    			return feasibleNodes, diagnosis, nil
    		}
    	}
    	//返回通过filter的node
    	feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
    	if err != nil {
    		return nil, diagnosis, err
    	}
    	...
    	return feasibleNodes, diagnosis, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    findNodesThatPassFilters选取指定个数的node执行Filter插件,并返回通过的node列表

    // findNodesThatPassFilters finds the nodes that fit the filter plugins.
    func (g *genericScheduler) findNodesThatPassFilters(
    	ctx context.Context,
    	fwk framework.Framework,
    	state *framework.CycleState,
    	pod *v1.Pod,
    	diagnosis framework.Diagnosis,
    	nodes []*framework.NodeInfo) ([]*v1.Node, error) {
    	//返回此次参与调度的node个数:
    	//如果总node个数小于100或者percentageOfNodesToScore为100,则返回总node个数,即所有node都参与调度,
    	//否则按照percentageOfNodesToScore指定的百分比选取node个数
    	numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
    
    	// Create feasible list with enough space to avoid growing it
    	// and allow assigning.
    	feasibleNodes := make([]*v1.Node, numNodesToFind)
    
    	//没有配置Filter插件,则返回所有的可用node feasibleNodes
    	if !fwk.HasFilterPlugins() {
    		length := len(nodes)
    		for i := range feasibleNodes {
    			feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
    		}
    		g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
    		return feasibleNodes, nil
    	}
    
    	errCh := parallelize.NewErrorChannel()
    	var statusesLock sync.Mutex
    	var feasibleNodesLen int32
    	ctx, cancel := context.WithCancel(ctx)
    	//参数i指的是要处理的数据的索引
    	//比如要处理的node个数为160,启动16个协程进行处理,则每个协程要处理10个,
    	//第一个协程处理0-15,第二个处理16-31,以此类推
    	checkNode := func(i int) {
    		// We check the nodes starting from where we left off in the previous scheduling cycle,
    		// this is to make sure all nodes have the same chance of being examined across pods.
    		nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
    		//执行 filter 插件
    		status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
    		if status.Code() == framework.Error {
    			errCh.SendErrorWithCancel(status.AsError(), cancel)
    			return
    		}
    		if status.IsSuccess() {
    			length := atomic.AddInt32(&feasibleNodesLen, 1)
    			if length > numNodesToFind {
    				cancel()
    				atomic.AddInt32(&feasibleNodesLen, -1)
    			} else {
    				feasibleNodes[length-1] = nodeInfo.Node()
    			}
    		} else {
    			statusesLock.Lock()
    			diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
    			diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
    			statusesLock.Unlock()
    		}
    	}
    
    	beginCheckNode := time.Now()
    	statusCode := framework.Success
    	defer func() {
    		// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
    		// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
    		// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
    		metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
    	}()
    
    	// Stops searching for more nodes once the configured number of feasible nodes
    	// are found.
    	//启动16个协程,并行执行checkNode,len(nodes)表示要处理的总个数
    	fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
    	//处理的node个数为过滤成功的node个数和失败的node个数总和
    	processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
    	//nextStartNodeIndex是下次调度时要处理的node的索引
    	//对len(nodes)取模的作用是达到最大值后反转到0
    	g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
    
    	feasibleNodes = feasibleNodes[:feasibleNodesLen]
    	if err := errCh.ReceiveError(); err != nil {
    		statusCode = framework.Error
    		return nil, err
    	}
    	return feasibleNodes, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    prioritizeNodes对node执行score插件进行打分

    func prioritizeNodes(
    	ctx context.Context,
    	extenders []framework.Extender,
    	fwk framework.Framework,
    	state *framework.CycleState,
    	pod *v1.Pod,
    	nodes []*v1.Node,
    ) (framework.NodeScoreList, error) {
    	// If no priority configs are provided, then all nodes will have a score of one.
    	// This is required to generate the priority list in the required format
    	//如果没配置score插件,则设置每个node得分为1
    	if len(extenders) == 0 && !fwk.HasScorePlugins() {
    		result := make(framework.NodeScoreList, 0, len(nodes))
    		for i := range nodes {
    			result = append(result, framework.NodeScore{
    				Name:  nodes[i].Name,
    				Score: 1,
    			})
    		}
    		return result, nil
    	}
    
    	//执行 prescore 插件
    	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    	if !preScoreStatus.IsSuccess() {
    		return nil, preScoreStatus.AsError()
    	}
    
    	//执行 score 插件
    	scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    	if !scoreStatus.IsSuccess() {
    		return nil, scoreStatus.AsError()
    	}
    
    	//汇总每个node总得分
    	result := make(framework.NodeScoreList, 0, len(nodes))
    
    	for i := range nodes {
    		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
    		for j := range scoresMap {
    			result[i].Score += scoresMap[j][i].Score
    		}
    	}
    
    	...
    
    	return result, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    selectHost选择得分最高的node,如果多个node得分相同,则随机选择一个node

    func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
    	if len(nodeScoreList) == 0 {
    		return "", fmt.Errorf("empty priorityList")
    	}
    	maxScore := nodeScoreList[0].Score
    	selected := nodeScoreList[0].Name
    	cntOfMaxScore := 1
    	for _, ns := range nodeScoreList[1:] {
    		if ns.Score > maxScore {
    			maxScore = ns.Score
    			selected = ns.Name
    			cntOfMaxScore = 1
    		} else if ns.Score == maxScore {
    			cntOfMaxScore++
    			if rand.Intn(cntOfMaxScore) == 0 {
    				// Replace the candidate with probability of 1/cntOfMaxScore
    				selected = ns.Name
    			}
    		}
    	}
    	return selected, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    《计算机视觉技术与应用》-----第五章 边缘和轮廓
    『精』Vue 组件如何模块化抽离Props
    谷粒学苑_第十天
    低代码开发平台哪些好用
    大一新生必备电脑软件&插件有哪些?
    elasticsearch安装的各种坑
    ActiveReports.NET 17.1.X Carack
    Java与Redis的集成
    英雄联盟|王者|穿越火线 bgm AI配乐大赛分享
    虚拟机Ubuntu扩展磁盘大小
  • 原文地址:https://blog.csdn.net/fengcai_ke/article/details/127560228