• 彻底搞懂kubernetes调度框架与插件


    调度框架 [1]

    本文基于 kubernetes 1.24 进行分析

    调度框架(Scheduling Framework)是Kubernetes 的调度器 kube-scheduler 设计的的可插拔架构,将插件(调度算法)嵌入到调度上下文的每个扩展点中,并编译为 kube-scheduler

    kube-scheduler 1.22 之后,在 pkg/scheduler/framework/interface.go 中定义了一个 Plugininterface,这个 interface 作为了所有插件的父级。而每个未调度的 Pod,Kubernetes 调度器会根据一组规则尝试在集群中寻找一个节点。

    type Plugin interface {
    	Name() string
    }
    
    • 1
    • 2
    • 3

    下面会对每个算法是如何实现的进行分析

    在初始化 scheduler 时,会创建一个 profile,profile是关于 scheduler 调度配置相关的定义

    func New(client clientset.Interface,
    ...
    	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
    		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
    		frameworkruntime.WithClientSet(client),
    		frameworkruntime.WithKubeConfig(options.kubeConfig),
    		frameworkruntime.WithInformerFactory(informerFactory),
    		frameworkruntime.WithSnapshotSharedLister(snapshot),
    		frameworkruntime.WithPodNominator(nominator),
    		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
    		frameworkruntime.WithClusterEventMap(clusterEventMap),
    		frameworkruntime.WithParallelism(int(options.parallelism)),
    		frameworkruntime.WithExtenders(extenders),
    	)
    	if err != nil {
    		return nil, fmt.Errorf("initializing profiles: %v", err)
    	}
    
    	if len(profiles) == 0 {
    		return nil, errors.New("at least one profile is required")
    	}
    ....
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    关于 profile 的实现,则为 KubeSchedulerProfile,也是作为 yaml生成时传入的配置

    // KubeSchedulerProfile 是一个 scheduling profile.
    type KubeSchedulerProfile struct {
    	// SchedulerName 是与此配置文件关联的调度程序的名称。
        // 如果 SchedulerName 与 pod “spec.schedulerName”匹配,则使用此配置文件调度 pod。
    	SchedulerName string
    
    	// Plugins指定应该启用或禁用的插件集。
        // 启用的插件是除了默认插件之外应该启用的插件。禁用插件应是禁用的任何默认插件。
        // 当没有为扩展点指定启用或禁用插件时,将使用该扩展点的默认插件(如果有)。
        // 如果指定了 QueueSort 插件,
        /// 则必须为所有配置文件指定相同的 QueueSort Plugin 和 PluginConfig。
        // 这个Plugins展现的形式则是调度上下文中的所有扩展点(这是抽象),实际中会表现为多个扩展点
    	Plugins *Plugins
    
    	// PluginConfig 是每个插件的一组可选的自定义插件参数。
        // 如果省略PluginConfig参数等同于使用该插件的默认配置。
    	PluginConfig []PluginConfig
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    对于 profile.NewMap 就是根据给定的配置来构建这个framework,因为配置可能是存在多个的。而 Registry 则是所有可用插件的集合,内部构造则是 PluginFactory ,通过函数来构建出对应的 plugin

    func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
    	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
    	m := make(Map)
    	v := cfgValidator{m: m}
    
    	for _, cfg := range cfgs {
    		p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
    		if err != nil {
    			return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
    		}
    		if err := v.validate(cfg, p); err != nil {
    			return nil, err
    		}
    		m[cfg.SchedulerName] = p
    	}
    	return m, nil
    }
    
    // newProfile 给的配置构建出一个profile
    func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
    	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
    	recorder := recorderFact(cfg.SchedulerName)
    	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
    	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    可以看到最终返回的是一个 Framework 。那么来看下这个 Framework

    Framework 是一个抽象,管理着调度过程中所使用的所有插件,并在调度上下文中适当的位置去运行对应的插件

    type Framework interface {
    	Handle
    	// QueueSortFunc 返回对调度队列中的 Pod 进行排序的函数
        // 也就是less,在Sort打分阶段的打分函数
    	QueueSortFunc() LessFunc
        
        // RunPreFilterPlugins 运行配置的一组PreFilter插件。
        // 如果这组插件中,任何一个插件失败,则返回 *Status 并设置为non-success。
        // 如果返回状态为non-success,则调度周期中止。
        // 它还返回一个 PreFilterResult,它可能会影响到要评估下游的节点。
        
    	RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
    
        // RunPostFilterPlugins 运行配置的一组PostFilter插件。 
        // PostFilter 插件是通知性插件,在这种情况下应配置为先执行并返回 Unschedulable 状态,
        // 或者尝试更改集群状态以使 pod 在未来的调度周期中可能会被调度。
    	RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
    
        // RunPreBindPlugins 运行配置的一组 PreBind 插件。
        // 如果任何一个插件返回错误,则返回 *Status 并且code设置为non-success。
        // 如果code为“Unschedulable”,则调度检查失败,
        // 则认为是内部错误。在任何一种情况下,Pod都不会被bound。
    	RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
        // RunPostBindPlugins 运行配置的一组PostBind插件
    	RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
        // RunReservePluginsReserve运行配置的一组Reserve插件的Reserve方法。
        // 如果在这组调用中的任何一个插件返回错误,则不会继续运行剩余调用的插件并返回错误。
        // 在这种情况下,pod将不能被调度。
    	RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
        // RunReservePluginsUnreserve运行配置的一组Reserve插件的Unreserve方法。
    	RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
        // RunPermitPlugins运行配置的一组Permit插件。
        // 如果这些插件中的任何一个返回“Success”或“Wait”之外的状态,则它不会继续运行其余插件并返回错误。
        // 否则,如果任何插件返回 “Wait”,则此函数将创建等待pod并将其添加到当前等待pod的map中,
        // 并使用“Wait” code返回状态。 Pod将在Permit插件返回的最短持续时间内保持等待pod。
    	RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
        // 如果pod是waiting pod,WaitOnPermit 将阻塞,直到等待的pod被允许或拒绝。
    	WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
    
        // RunBindPlugins运行配置的一组bind插件。 Bind插件可以选择是否处理Pod。
        // 如果 Bind 插件选择跳过binding,它应该返回 code=5("skip")状态。
        // 否则,它应该返回“Error”或“Success”。
        // 如果没有插件处理绑定,则RunBindPlugins返回code=5("skip")的状态。
    	RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    	// 如果至少定义了一个filter插件,则HasFilterPlugins返回true
    	HasFilterPlugins() bool
    
        // 如果至少定义了一个PostFilter插件,则HasPostFilterPlugins返回 true。
    	HasPostFilterPlugins() bool
    
    	// 如果至少定义了一个Score插件,则HasScorePlugins返回 true。
    	HasScorePlugins() bool
    
        // ListPlugins将返回map。key为扩展点名称,value则是配置的插件列表。
    	ListPlugins() *config.Plugins
    
        // ProfileName则是与profile name关联的framework
    	ProfileName() string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    而实现这个抽象的则是 frameworkImplframeworkImpl 是初始化与运行 scheduler plugins 的组件,并在调度上下文中会运行这些扩展点

    type frameworkImpl struct {
       registry             Registry
       snapshotSharedLister framework.SharedLister
       waitingPods          *waitingPodsMap
       scorePluginWeight    map[string]int
       queueSortPlugins     []framework.QueueSortPlugin
       preFilterPlugins     []framework.PreFilterPlugin
       filterPlugins        []framework.FilterPlugin
       postFilterPlugins    []framework.PostFilterPlugin
       preScorePlugins      []framework.PreScorePlugin
       scorePlugins         []framework.ScorePlugin
       reservePlugins       []framework.ReservePlugin
       preBindPlugins       []framework.PreBindPlugin
       bindPlugins          []framework.BindPlugin
       postBindPlugins      []framework.PostBindPlugin
       permitPlugins        []framework.PermitPlugin
    
       clientSet       clientset.Interface
       kubeConfig      *restclient.Config
       eventRecorder   events.EventRecorder
       informerFactory informers.SharedInformerFactory
    
       metricsRecorder *metricsRecorder
       profileName     string
    
       extenders []framework.Extender
       framework.PodNominator
    
       parallelizer parallelize.Parallelizer
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    那么来看下 Registry ,Registry 是作为一个可用插件的集合。framework 使用 registry 来启用和对插件配置的初始化。在初始化框架之前,所有插件都必须在注册表中。表现形式就是一个 map[]key 是插件的名称,value是 PluginFactory

    type Registry map[string]PluginFactory
    
    • 1

    而在 pkg\scheduler\framework\plugins\registry.go 中会将所有的 in-tree plugin 注册进来。通过 NewInTreeRegistry 。后续如果还有插件要注册,可以通过 WithFrameworkOutOfTreeRegistry 来注册其他的插件。

    func NewInTreeRegistry() runtime.Registry {
    	fts := plfeature.Features{
    		EnableReadWriteOncePod:                       feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
    		EnableVolumeCapacityPriority:                 feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
    		EnableMinDomainsInPodTopologySpread:          feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
    		EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
    	}
    
    	return runtime.Registry{
    		selectorspread.Name:                  selectorspread.New,
    		imagelocality.Name:                   imagelocality.New,
    		tainttoleration.Name:                 tainttoleration.New,
    		nodename.Name:                        nodename.New,
    		nodeports.Name:                       nodeports.New,
    		nodeaffinity.Name:                    nodeaffinity.New,
    		podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
    		nodeunschedulable.Name:               nodeunschedulable.New,
    		noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
    		noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
    		volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
    		volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
    		volumezone.Name:                      volumezone.New,
    		nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
    		nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
    		nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
    		nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
    		nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
    		interpodaffinity.Name:                interpodaffinity.New,
    		queuesort.Name:                       queuesort.New,
    		defaultbinder.Name:                   defaultbinder.New,
    		defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    这里插入一个题外话,关于 in-tree plugin

    在这里没有找到关于,kube-scheduler ,只是找到有关的概念,大概可以解释为,in-tree表示为随kubernetes官方提供的二进制构建的 plugin 则为 in-tree,而独立于kubernetes代码库之外的为 out-of-tree [3] 。这种情况下,可以理解为,AA则是 out-of-treePod, DeplymentSet 等是 in-tree

    接下来回到初始化 scheduler ,在初始化一个 scheduler 时,会通过NewInTreeRegistry 来初始化

    func New(client clientset.Interface,
    	....
    	registry := frameworkplugins.NewInTreeRegistry()
    	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
    		return nil, err
    	}
             
    	...
    
    	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
    		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
    		frameworkruntime.WithClientSet(client),
    		frameworkruntime.WithKubeConfig(options.kubeConfig),
    		frameworkruntime.WithInformerFactory(informerFactory),
    		frameworkruntime.WithSnapshotSharedLister(snapshot),
    		frameworkruntime.WithPodNominator(nominator),
    		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
    		frameworkruntime.WithClusterEventMap(clusterEventMap),
    		frameworkruntime.WithParallelism(int(options.parallelism)),
    		frameworkruntime.WithExtenders(extenders),
    	)
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    接下来在调度上下文 scheduleOneschedulePod 时,会通过 framework 调用对应的插件来处理这个扩展点工作。具体的体现在,pkg\scheduler\schedule_one.go 中的预选阶段

    func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    	defer trace.LogIfLong(100 * time.Millisecond)
    
    	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
    		return result, err
    	}
    	trace.Step("Snapshotting scheduler cache and node infos done")
    
    	if sched.nodeInfoSnapshot.NumNodes() == 0 {
    		return result, ErrNoNodesAvailable
    	}
    
    	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    	if err != nil {
    		return result, err
    	}
    	trace.Step("Computing predicates done")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    与其他扩展点部分,在调度上下文 scheduleOne 中可以很好的看出,功能都是 framework 提供的。

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
    
        ...
        
    	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    
        ...
        
    	// Run the Reserve method of reserve plugins.
    	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
    	}
    
        ...
        
    	// Run "permit" plugins.
    	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    	
    		// One of the plugins returned status different than success or wait.
    		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    
    ...
        
    	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    	go func() {
    		...
    		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
    		if !waitOnPermitStatus.IsSuccess() {
    			...
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		}
    
    		// Run "prebind" plugins.
    		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    		
            ...
            
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    	
            ...
    
    		...
    			// trigger un-reserve plugins to clean up state associated with the reserved Pod
    			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    			
            ...
    
    		// Run "postbind" plugins.
    		fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    插件 [4]

    插件(Plugins)(也可以算是调度策略)在 kube-scheduler 中的实现为 framework plugin,插件API的实现分为两个步骤:registerconfigured,然后都实现了其父方法 Plugin。然后可以通过配置(kube-scheduler --config 提供)启动或禁用插件;除了默认插件外,还可以实现自定义调度插件与默认插件进行绑定。

    type Plugin interface {
        Name() string
    }
    // sort扩展点
    type QueueSortPlugin interface {
        Plugin
        Less(*v1.pod, *v1.pod) bool
    }
    // PreFilter扩展点
    type PreFilterPlugin interface {
        Plugin
        PreFilter(context.Context, *framework.CycleState, *v1.pod) error
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    插件的载入过程

    scheduler 被启动时,会 scheduler.New(cc.Client.. 这个时候会传入 profiles,整个的流如下:

    NewScheduler

    我们了解如何 New 一个 scheduler 即为 Setup 中去配置这些参数,

    func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    
        ...
        
    	// Create the scheduler.
    	sched, err := scheduler.New(cc.Client,
    		cc.InformerFactory,
    		cc.DynInformerFactory,
    		recorderFactory,
    		ctx.Done(),
    		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
    		scheduler.WithKubeConfig(cc.KubeConfig),
    		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
    		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
    		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
    		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
    		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
    		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
    		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
    		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
    		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
    			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
    			completedProfiles = append(completedProfiles, profile)
    		}),
    	)
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    profile.NewMap

    scheduler.New 中,会根据配置生成profile,而 profile.NewMap 会完成这一步

    func New(client clientset.Interface,
    	...
             
    	clusterEventMap := make(map[framework.ClusterEvent]sets.String)
    
    	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
    		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
    		frameworkruntime.WithClientSet(client),
    		frameworkruntime.WithKubeConfig(options.kubeConfig),
    		frameworkruntime.WithInformerFactory(informerFactory),
    		frameworkruntime.WithSnapshotSharedLister(snapshot),
    		frameworkruntime.WithPodNominator(nominator),
    		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
    		frameworkruntime.WithClusterEventMap(clusterEventMap),
    		frameworkruntime.WithParallelism(int(options.parallelism)),
    		frameworkruntime.WithExtenders(extenders),
    	)
    
             ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    NewFramework

    newProfile 返回的则是一个创建好的 framework

    func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
    	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
    	recorder := recorderFact(cfg.SchedulerName)
    	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
    	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    最终会走到 pluginsNeeded,这里会根据配置中开启的插件而返回一个插件集,这个就是最终在每个扩展点中要执行的插件。

    func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.String {
    	pgSet := sets.String{}
    
    	if plugins == nil {
    		return pgSet
    	}
    
    	find := func(pgs *config.PluginSet) {
    		for _, pg := range pgs.Enabled {
    			pgSet.Insert(pg.Name)
    		}
    	}
    	// 获取到所有的扩展点,找到为Enabled的插件加入到pgSet
    	for _, e := range f.getExtensionPoints(plugins) {
    		find(e.plugins)
    	}
    	// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
    	find(&plugins.MultiPoint)
    
    	return pgSet
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    插件的执行

    在对插件源码部分分析,会找几个典型的插件进行分析,而不会对全部的进行分析,因为总的来说是大同小异,分析的插件有 NodePortsNodeResourcesFitpodtopologyspread

    NodePorts

    这里以一个简单的插件来分析;NodePorts 插件用于检查Pod请求的端口,在节点上是否为空闲端口。

    NodePorts 实现了 FilterPluginPreFilterPlugin

    PreFilter 将会被 frameworkPreFilter 扩展点被调用。

    func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
    	s := getContainerPorts(pod) // 或得Pod得端口
        // 写入状态
    	cycleState.Write(preFilterStateKey, preFilterState(s))
    	return nil, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Filter 将会被 frameworkFilter 扩展点被调用。

    // Filter invoked at the filter extension point.
    func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
       wantPorts, err := getPreFilterState(cycleState)
       if err != nil {
          return framework.AsStatus(err)
       }
    
       fits := fitsPorts(wantPorts, nodeInfo)
       if !fits {
          return framework.NewStatus(framework.Unschedulable, ErrReason)
       }
    
       return nil
    }
    
    func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
    	// 对比existingPorts 和 wantPorts是否冲突,冲突则调度失败
    	existingPorts := nodeInfo.UsedPorts
    	for _, cp := range wantPorts {
    		if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
    			return false
    		}
    	}
    	return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    New ,初始化新插件,在 register 中注册得

    func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
    	return &NodePorts{}, nil
    }
    
    • 1
    • 2
    • 3

    在调用中,如果有任何一个插件返回错误,则跳过该扩展点注册得其他插件,返回失败。

    func (f *frameworkImpl) RunFilterPlugins(
    	ctx context.Context,
    	state *framework.CycleState,
    	pod *v1.Pod,
    	nodeInfo *framework.NodeInfo,
    ) framework.PluginToStatus {
    	statuses := make(framework.PluginToStatus)
    	for _, pl := range f.filterPlugins {
    		pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
    		if !pluginStatus.IsSuccess() {
    			if !pluginStatus.IsUnschedulable() 
    				errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
    				return map[string]*framework.Status{pl.Name(): errStatus}
    			}
    			pluginStatus.SetFailedPlugin(pl.Name())
    			statuses[pl.Name()] = pluginStatus
    		}
    	}
    
    	return statuses
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    返回得状态是一个 Status 结构体,该结构体表示了插件运行的结果。由 Codereasons、(可选)errfailedPlugin (失败的那个插件名)组成。当 code 不是 Success 时,应说明原因。而且,当 codeSuccess 时,其他所有字段都应为空。nil 状态也被视为成功。

    type Status struct {
    	code    Code
    	reasons []string
    	err     error
    	// failedPlugin is an optional field that records the plugin name a Pod failed by.
    	// It's set by the framework when code is Error, Unschedulable or UnschedulableAndUnresolvable.
    	failedPlugin string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    NodeResourcesFit [5]

    NodeResourcesFit 扩展检查节点是否拥有 Pod 请求的所有资源。分数可以使用以下三种策略之一,扩展点为:preFilterfilterscore

    • LeastAllocated (默认)
    • MostAllocated
    • RequestedToCapacityRatio

    Fit

    NodeResourcesFit PreFilter 可以看到调用得 computePodResourceRequest

    // PreFilter invoked at the prefilter extension point.
    func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
       cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
       return nil, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    computePodResourceRequest 这里有一个注释,总体解释起来是这样得:computePodResourceRequest ,返回值( framework.Resource)覆盖了每一个维度中资源的最大宽度。因为将按照 init-containers , containers 得顺序运行,会通过迭代方式收集每个维度中的最大值。计算时会对常规容器的资源向量求和,因为containers 运行会同时运行多个容器。计算示例为:

    Pod:
      InitContainers
        IC1:
          CPU: 2
          Memory: 1G
        IC2:
          CPU: 2
          Memory: 3G
      Containers
        C1:
          CPU: 2
          Memory: 1G
        C2:
          CPU: 1
          Memory: 1G
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在维度1中(InitContainers)所需资源最大值时,CPU=2, Memory=3G;而维度2(Containers)所需资源最大值为:CPU=2, Memory=1G;那么最终结果为 CPU=3, Memory=3G,因为在维度1,最大资源时Memory=3G;而维度2最大资源是CPU=1+2, Memory=1+1,取每个维度中最大资源最大宽度即为 CPU=3, Memory=3G。

    下面则看下代码得实现

    func computePodResourceRequest(pod *v1.Pod) *preFilterState {
    	result := &preFilterState{}
    	for _, container := range pod.Spec.Containers {
    		result.Add(container.Resources.Requests)
    	}
    
    	// 取最大得资源
    	for _, container := range pod.Spec.InitContainers {
    		result.SetMaxResource(container.Resources.Requests)
    	}
    
    	// 如果Overhead正在使用,需要将其计算到总资源中
    	if pod.Spec.Overhead != nil {
    		result.Add(pod.Spec.Overhead)
    	}
    	return result
    }
    
    // SetMaxResource 是比较ResourceList并为每个资源取最大值。
    func (r *Resource) SetMaxResource(rl v1.ResourceList) {
    	if r == nil {
    		return
    	}
    
    	for rName, rQuantity := range rl {
    		switch rName {
    		case v1.ResourceMemory:
    			r.Memory = max(r.Memory, rQuantity.Value())
    		case v1.ResourceCPU:
    			r.MilliCPU = max(r.MilliCPU, rQuantity.MilliValue())
    		case v1.ResourceEphemeralStorage:
    			if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
    				r.EphemeralStorage = max(r.EphemeralStorage, rQuantity.Value())
    			}
    		default:
    			if schedutil.IsScalarResourceName(rName) {
    				r.SetScalar(rName, max(r.ScalarResources[rName], rQuantity.Value()))
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    leastAllocate

    LeastAllocated 是 NodeResourcesFit 的打分策略 ,LeastAllocated 打分的标准是更偏向于请求资源较少的Node。将会先计算出Node上调度的pod请求的内存、CPU与其他资源的百分比,然后并根据请求的比例与容量的平均值的最小值进行优先级排序。

    计算公式是这样的: c p u ( ( c a p a c i t y − r e q u e s t e d ) × M a x N o d e S c o r e × c p u W e i g h t ) c a p a c i t y + m e m o r y ( ( c a p a c i t y − r e q u e s t e d ) × M a x N o d e S c o r e × m e m o r y W e i g h t c a p a c i t y ) + . . . w e i g h t S u m \frac{\frac{cpu((capacity-requested) \times MaxNodeScore \times cpuWeight)}{capacity} + \frac{memory((capacity-requested) \times MaxNodeScore \times memoryWeight}{capacity}) + ...}{weightSum} weightSumcapacitycpu((capacityrequested)×MaxNodeScore×cpuWeight)+capacitymemory((capacityrequested)×MaxNodeScore×memoryWeight)+...

    下面来看下实现

    func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
    	return func(requested, allocable resourceToValueMap) int64 {
    		var nodeScore, weightSum int64
    		for resource := range requested {
    			weight := resToWeightMap[resource]
                //  计算出的资源分数乘weight
    			resourceScore := leastRequestedScore(requested[resource], allocable[resource])
    			nodeScore += resourceScore * weight
    			weightSum += weight
    		}
    		if weightSum == 0 {
    			return 0
    		}
            // 最终除weightSum
    		return nodeScore / weightSum
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    leastRequestedScore 计算标准为未使用容量的计算范围为 0~MaxNodeScore,0 为最低优先级,MaxNodeScore 为最高优先级。未使用的资源越多,得分越高。

    func leastRequestedScore(requested, capacity int64) int64 {
    	if capacity == 0 {
    		return 0
    	}
    	if requested > capacity {
    		return 0
    	}
    	// 容量 - 请求的 x 预期值(100)/ 容量
    	return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Topology [6]

    Concept

    在对 podtopologyspread 插件进行分析前,先需要掌握Pod拓扑的概念。

    Pod拓扑(Pod Topology)是Kubernetes Pod调度机制,可以将Pod分布在集群中不同 Zone ,以及用户自定义的各种拓扑域 (topology domains)。当有了拓扑域后,用户可以更高效的利用集群资源。

    如何来解释拓扑域,首先需要提及为什么需要拓扑域,在集群有3个节点,并且当Pod副本数为2时,又不希望两个Pod在同一个Node上运行。在随着扩大Pod的规模,副本数扩展到到15个时,这时候最理想的方式是每个Node运行5个Pod,在这种背景下,用户希望对集群中Zone的安排为相似的副本数量,并且在集群存在部分问题时可以更好的自愈(也是按照相似的副本数量均匀的分布在Node上)。在这种情况下Kubernetes 提供了Pod 拓扑约束来解决这个问题。

    定义一个Topology

    apiVersion: v1
    kind: Pod
    metadata:
      name: example-pod
    spec:
      # Configure a topology spread constraint
      topologySpreadConstraints:
        - maxSkew: > # 
          minDomains: > # optional; alpha since v1.24
          topologyKey: >
          whenUnsatisfiable: >
          labelSelector: >
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    参数的描述

    • maxSkew:Required,Pod分布不均的程度,并且数字必须大于零
      • whenUnsatisfiable: DoNotSchedule,则定义目标拓扑中匹配 pod 的数量与 全局最小值拓扑域中的标签选择器匹配的 pod 的最小数量maxSkew之间的最大允许差异。例如有 3 个 Zone,分别具有 2、4 和 5 个匹配的 pod,则全局最小值为 2
      • whenUnsatisfiable: ScheduleAnywayscheduler 会为减少倾斜的拓扑提供更高的优先级。
    • minDomains:optional,符合条件的域的最小数量。
      • 如果不指定该选项 minDomains,则约束的行为 minDomains: 1
      • minDomains必须大于 0。minDomainswhenUnsatisfiable 一起时为whenUnsatisfiable: DoNotSchedule
    • topologyKey:Node label的key,如果多个Node都使用了这个lable key那么 scheduler 将这些 Node 看作为相同的拓扑域。
    • whenUnsatisfiable:当 Pod 不满足分布的约束时,怎么去处理
      • DoNotSchedule(默认)不要调度。
      • ScheduleAnyway仍然调度它,同时优先考虑最小化倾斜节点
    • labelSelector:查找匹配的 Pod label选择器的node进行技术,以计算Pod如何分布在拓扑域中

    对于拓扑域的理解

    对于拓扑域,官方是这么说明的,假设有一个带有以下lable的 4 节点集群:

    NAME    STATUS   ROLES    AGE     VERSION   LABELS
    node1   Ready    <none>   4m26s   v1.16.0   node=node1,zone=zoneA
    node2   Ready    <none>   3m58s   v1.16.0   node=node2,zone=zoneA
    node3   Ready    <none>   3m17s   v1.16.0   node=node3,zone=zoneB
    node4   Ready    <none>   2m43s   v1.16.0   node=node4,zone=zoneB
    
    • 1
    • 2
    • 3
    • 4
    • 5

    那么集群拓扑如图:

    image

    图1:集群拓扑图
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

    假设一个 4 节点集群,其中 3个label被标记为foo: bar的 Pod 分别位于Node1、Node2 和 Node3:

    image

    图2:集群拓扑图
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

    这种情况下,新部署一个Pod,并希望新Pod与现有Pod跨 Zone均匀分布,资源清单文件如下:

    kind: Pod
    apiVersion: v1
    metadata:
      name: mypod
      labels:
        foo: bar
    spec:
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            foo: bar
      containers:
      - name: pause
        image: k8s.gcr.io/pause:3.1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这个清单对于拓扑域来说,topologyKey: zone 表示对Pod均匀分布仅应用于已标记的节点(如 foo: bar),将会跳过没有标签的节点(如zone: )。如果 scheduler 找不到满足约束的方法,whenUnsatisfiable: DoNotSchedule 设置的策略则是 scheduler 对新部署的Pod保持 Pendding

    如果此时 scheduler 将新Pod 调度至 Z o n e A Zone_A ZoneA,此时Pod分布在拓扑域间为 [ 3 , 1 ] [3,1] [3,1] ,而 maxSkew 配置的值是1。此时倾斜值为 Z o n e A − Z o n e B = 3 − 1 = 2 Zone_A - Zone_B = 3-1=2 ZoneAZoneB=31=2,不满足 maxSkew=1,故这个Pod只能被调度到 Z o n e B Zone_B ZoneB

    此时Pod调度拓扑图为图3或图4

    image

    图3:集群拓扑图
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

    image

    图4:集群拓扑图
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

    如果需要将Pod调度到 Z o n e A Zone_A ZoneA ,可以按照如下方式进行:

    • 修改 maxSkew=2
    • 修改 topologyKey: node 而不是 Zone ,这种模式下可以将 Pod 均匀分布在Node而不是Zone之间。
    • 修改 whenUnsatisfiable: DoNotSchedulewhenUnsatisfiable: ScheduleAnyway 确保新的Pod始终可被调度

    下面再通过一个例子增强对拓扑域了解

    多拓扑约束

    设拥有一个 4 节点集群,其中 3 个现有 Pod 标记 foo: bar 分别位于 node1node2node3

    image

    图5:集群拓扑图
    Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

    部署的资源清单如下:可以看出拓扑分布约束配置了多个

    kind: Pod
    apiVersion: v1
    metadata:
      name: mypod
      labels:
        foo: bar
    spec:
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            foo: bar
      - maxSkew: 1
        topologyKey: node
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            foo: bar
      containers:
      - name: pause
        image: k8s.gcr.io/pause:3.1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这种情况下,为了匹配第一个约束条件,新Pod 只能放置在 Z o n e B Zone_B ZoneB ;而就第二个约束条件,新Pod只能调度到 node4。在这种配置多约束条件下, scheduler 只考虑满足所有约束的值,因此唯一有效的是 node4

    如何为集群设置一个默认拓扑域约束

    默认情况下,拓扑域约束也作 scheduler 的为 scheduler configurtion 中的一部分参数,这也意味着,可以通过profile为整个集群级别指定一个默认的拓扑域调度约束,

    apiVersion: kubescheduler.config.k8s.io/v1beta3
    kind: KubeSchedulerConfiguration
    
    profiles:
      - schedulerName: default-scheduler
        pluginConfig:
          - name: PodTopologySpread
            args:
              defaultConstraints:
                - maxSkew: 1
                  topologyKey: topology.kubernetes.io/zone
                  whenUnsatisfiable: ScheduleAnyway
              defaultingType: List
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    默认约束策略

    如果在没有配置集群级别的约束策略时,kube-scheduler 内部 topologyspread 插件提供了一个默认的拓扑约束策略,大致上如下列清单所示

    defaultConstraints:
      - maxSkew: 3
        topologyKey: "kubernetes.io/hostname"
        whenUnsatisfiable: ScheduleAnyway
      - maxSkew: 5
        topologyKey: "topology.kubernetes.io/zone"
        whenUnsatisfiable: ScheduleAnyway
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上述清单中内容可以在 pkg\scheduler\framework\plugins\podtopologyspread\plugin.go

    var systemDefaultConstraints = []v1.TopologySpreadConstraint{
    	{
    		TopologyKey:       v1.LabelHostname,
    		WhenUnsatisfiable: v1.ScheduleAnyway,
    		MaxSkew:           3,
    	},
    	{
    		TopologyKey:       v1.LabelTopologyZone,
    		WhenUnsatisfiable: v1.ScheduleAnyway,
    		MaxSkew:           5,
    	},
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以通过在配置文件中留空,来禁用默认配置

    • defaultConstraints: []
    • defaultingType: List
    apiVersion: kubescheduler.config.k8s.io/v1beta3
    kind: KubeSchedulerConfiguration
    
    profiles:
      - schedulerName: default-scheduler
        pluginConfig:
          - name: PodTopologySpread
            args:
              defaultConstraints: []
              defaultingType: List
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    通过源码学习Topology

    podtopologyspread 实现了4种扩展点方法,包含 filterscore

    PreFilter

    可以看到 PreFilter 的核心为 calPreFilterState

    func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
    	s, err := pl.calPreFilterState(ctx, pod)
    	if err != nil {
    		return nil, framework.AsStatus(err)
    	}
    	cycleState.Write(preFilterStateKey, s)
    	return nil, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    calPreFilterState 主要功能是用在计算如何在拓扑域中分布Pod,首先看段代码时,需要掌握下属几个概念

    func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) (*preFilterState, error) {
        // 获取Node
    	allNodes, err := pl.sharedLister.NodeInfos().List()
    	if err != nil {
    		return nil, fmt.Errorf("listing NodeInfos: %w", err)
    	}
    	var constraints []topologySpreadConstraint
    	if len(pod.Spec.TopologySpreadConstraints) > 0 {
    		// 这里会构建出TopologySpreadConstraints,因为约束是不确定的
    		constraints, err = filterTopologySpreadConstraints(
    			pod.Spec.TopologySpreadConstraints,
    			v1.DoNotSchedule,
    			pl.enableMinDomainsInPodTopologySpread,
    			pl.enableNodeInclusionPolicyInPodTopologySpread,
    		)
    		if err != nil {
    			return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
    		}
    	} else {
            // buildDefaultConstraints使用".DefaultConstraints"与pod匹配的
            // service、replication controllers、replica sets 
            // 和stateful sets的选择器为pod构建一个约束。
    		constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
    		if err != nil {
    			return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
    		}
    	}
    	if len(constraints) == 0 { // 如果是空的,则返回空preFilterState
    		return &preFilterState{}, nil
    	}
        // 初始化一个 preFilterState 状态
    	s := preFilterState{
    		Constraints:          constraints,
    		TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
    		TpPairToMatchNum:     make(map[topologyPair]int, sizeHeuristic(len(allNodes), constraints)),
    	}
    	// 根据node统计拓扑域数量
    	tpCountsByNode := make([]map[topologyPair]int, len(allNodes))
    	// 获取pod亲和度配置
    	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
    	processNode := func(i int) {
    		nodeInfo := allNodes[i]
    		node := nodeInfo.Node()
    		if node == nil {
    			klog.ErrorS(nil, "Node not found")
    			return
    		}
    		// 通过spreading去过滤node以用作filters,错误解析以向后兼容
    		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
    			if match, _ := requiredNodeAffinity.Match(node); !match {
    				return
    			}
    		}
    
    		// 确保node的lable 包含topologyKeys定义的值
    		if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
    			return
    		}
    
    		tpCounts := make(map[topologyPair]int, len(constraints))
    		for _, c := range constraints { // 对应的约束列表
    			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
    				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
    				continue
    			}
    			// 构建出 topologyPair 以key value形式,
    			// 通常情况下TopologyKey属于什么类型的拓扑
    			//  node.Labels[c.TopologyKey] 则是属于这个拓扑中那个子域
    			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
    			// 计算与标签选择器相匹配的pod有多少个
    			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
    			tpCounts[pair] = count
    		}
    		tpCountsByNode[i] = tpCounts // 最终形成的拓扑结构
    	}
    	// 执行上面的定义的processNode,执行的数量就是node的数量
    	pl.parallelizer.Until(ctx, len(allNodes), processNode)
    	// 最后构建出 TpPairToMatchNum
    	// 表示每个拓扑域中的每个子域各分布多少Pod,如图6所示
    	for _, tpCounts := range tpCountsByNode {
    		for tp, count := range tpCounts {
    			s.TpPairToMatchNum[tp] += count
    		}
    	}
    	if pl.enableMinDomainsInPodTopologySpread {
    		// 根据状态进行构建 preFilterState
    		s.TpKeyToDomainsNum = make(map[string]int, len(constraints))
    		for tp := range s.TpPairToMatchNum {
    			s.TpKeyToDomainsNum[tp.key]++
    		}
    	}
    
    	// 计算最小匹配出的拓扑对
    	for i := 0; i < len(constraints); i++ {
    		key := constraints[i].TopologyKey
    		s.TpKeyToCriticalPaths[key] = newCriticalPaths()
    	}
    	for pair, num := range s.TpPairToMatchNum {
    		s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
    	}
    
    	return &s, nil // 返回的值则包含最小的分布
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    preFilterState

    // preFilterState 是在PreFilter处计算并在Filter处使用。
    // 它结合了 “TpKeyToCriticalPaths” 和 “TpPairToMatchNum” 来表示:
    //(1)在每个分布约束上匹配最少pod的criticalPaths。 
    // (2) 在每个分布约束上匹配的pod的数量。
    // “nil preFilterState” 则表示没有设置(在PreFilter阶段);
    // empty “preFilterState”对象则表示它是一个合法的状态,并在PreFilter阶段设置。
    
    type preFilterState struct {
    	Constraints []topologySpreadConstraint
    
        // 这里记录2条关键路径而不是所有关键路径。 
        // criticalPaths[0].MatchNum 始终保存最小匹配数。 
        // criticalPaths[1].MatchNum 总是大于或等于criticalPaths[0].MatchNum,但不能保证是第二个最小匹配数。
    	TpKeyToCriticalPaths map[string]*criticalPaths
    	
        // TpKeyToDomainsNum 以 “topologyKey” 作为key ,并以zone的数量作为值。
    	TpKeyToDomainsNum map[string]int
    	
        // TpPairToMatchNum 以 “topologyPair作为key” ,并以匹配到pod的数量作为value。
    	TpPairToMatchNum map[topologyPair]int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    criticalPaths

    // [2]criticalPath能够工作的原因是基于当前抢占算法的实现,特别是以下两个事实
    // 事实 1:只抢占同一节点上的Pod,而不是多个节点上的 Pod。
    // 事实 2:每个节点在其抢占周期期间在“preFilterState”的单独副本上进行评估。如果我们计划转向更复杂的算法,例如“多个节点上的任意pod”时则需要重新考虑这种结构。
    type criticalPaths [2]struct {
    	// TopologyValue代表映射到拓扑键的拓扑值。
    	TopologyValue string
    	// MatchNum代表匹配到的pod数量
    	MatchNum int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    单元测试中的测试案例,具有两个约束条件的场景,通过表格来解析如下:

    Node列表与标签如下表:

    Node Name🏷Lable-zone🏷Lable-node
    node-azone1node-a
    node-bzone1node-b
    node-xzone2node-x
    node-yzone2node-y

    Pod列表与标签如下表:

    Pod NameNode🏷Label
    p-a1node-afoo:
    p-a2node-afoo:
    p-b1node-bfoo:
    p-y1node-yfoo:
    p-y2node-yfoo:
    p-y3node-yfoo:
    p-y4node-yfoo:

    对应的拓扑约束

    spec:
      topologySpreadConstraints:
      - MaxSkew: 1
    	TopologyKey: zone
    	labelSelector:
    	  matchLabels:
    	    foo: bar
    	MinDomains: 1
    	NodeAffinityPolicy: Honor
    	NodeTaintsPolicy: Ignore
      - MaxSkew: 1
    	TopologyKey: node
    	labelSelector:
    	  matchLabels:
    	    foo: bar
    	MinDomains: 1
    	NodeAffinityPolicy: Honor
    	NodeTaintsPolicy: Ignore
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    那么整个分布如下:

    image

    图6:具有两个场景的分布图

    实现的测试代码如下

    {
        name: "normal case with two spreadConstraints",
        pod: st.MakePod().Name("p").Label("foo", "").
        SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil).
        SpreadConstraint(1, "node", v1.DoNotSchedule, fooSelector, nil, nil, nil).
        Obj(),
        nodes: []*v1.Node{
            st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
            st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
            st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
            st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(),
        },
        existingPods: []*v1.Pod{
            st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(),
            st.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(),
            st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(),
            st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(),
            st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
            st.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(),
            st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
        },
        want: &preFilterState{
            Constraints: []topologySpreadConstraint{
                {
                    MaxSkew:            1,
                    TopologyKey:        "zone",
                    Selector:           mustConvertLabelSelectorAsSelector(t, fooSelector),
                    MinDomains:         1,
                    NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,
                    NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore,
                },
                {
                    MaxSkew:            1,
                    TopologyKey:        "node",
                    Selector:           mustConvertLabelSelectorAsSelector(t, fooSelector),
                    MinDomains:         1,
                    NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,
                    NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore,
                },
            },
            TpKeyToCriticalPaths: map[string]*criticalPaths{
                "zone": {{"zone1", 3}, {"zone2", 4}},
                "node": {{"node-x", 0}, {"node-b", 1}},
            },
            for pair, num := range s.TpPairToMatchNum {
    		s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
    	}
            TpPairToMatchNum: map[topologyPair]int{
                {key: "zone", value: "zone1"}:  3,
                {key: "zone", value: "zone2"}:  4,
                {key: "node", value: "node-a"}: 2,
                {key: "node", value: "node-b"}: 1,
                {key: "node", value: "node-x"}: 0,
                {key: "node", value: "node-y"}: 4,
            },
        },
    },
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    update

    update 函数实际上时用于计算 criticalPaths 中的第一位始终保持为是一个最小Pod匹配值

    func (p *criticalPaths) update(tpVal string, num int) {
    	// first verify if `tpVal` exists or not
    	i := -1
    	if tpVal == p[0].TopologyValue {
    		i = 0
    	} else if tpVal == p[1].TopologyValue {
    		i = 1
    	}
    
    	if i >= 0 {
    		// `tpVal` 表示已经存在
    		p[i].MatchNum = num
    		if p[0].MatchNum > p[1].MatchNum {
    			// swap paths[0] and paths[1]
    			p[0], p[1] = p[1], p[0]
    		}
    	} else {
    		// `tpVal` 表示不存在,如一个新初始化的值
            // num对应子域分布的pod
            // 说明第一个元素不是最小的,则作为交换
    		if num < p[0].MatchNum {
    			// update paths[1] with paths[0]
    			p[1] = p[0]
    			// update paths[0]
    			p[0].TopologyValue, p[0].MatchNum = tpVal, num
    		} else if num < p[1].MatchNum {
    			// 如果小于 paths[1],则更新它,永远保证元素0是最小,1是次小的
    			p[1].TopologyValue, p[1].MatchNum = tpVal, num
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    综合来讲 Prefilter 主要做的工作是。循环所有的节点,先根据 NodeAffinity 或者 NodeSelector 进行过滤,然后根据约束中定义的 topologyKeys (拓扑划分的依据) 来选择节点。

    接下来会计算出每个拓扑域下的拓扑对(可以理解为子域)匹配的 Pod 数量,存入 TpPairToMatchNum 中,最后就是要把所有约束中匹配的 Pod 数量最小(第二小)匹配出来的路径(代码是这么定义的,理解上可以看作是分布图)放入 TpKeyToCriticalPaths 中保存起来。整个 preFilterState 保存下来传递到后续的 filter 插件中使用。

    Filter

    preFilter 中 最后的计算结果会保存在 CycleState

    cycleState.Write(preFilterStateKey, s)
    
    • 1

    Filter 主要是从 PreFilter 处理的过程中拿到状态 preFilterState,然后看下每个拓扑约束中的 MaxSkew 是否合法,具体的计算公式为: m a t c h N u m + s e l f M a t c h N u m − m i n M a t c h N u m matchNum + selfMatchNum - minMatchNum matchNum+selfMatchNumminMatchNum

    • matchNum:Prefilter 中计算出的对应的拓扑分布数量,可以在Prefilter中参考对应的内容
      • if tpCount, ok := s.TpPairToMatchNum[pair]; ok {
    • selfMatchNum:匹配到label的数量,匹配到则是1,否则为0
    • minMatchNum:获的 Prefilter 中计算出来的最小匹配的值
    func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    	node := nodeInfo.Node()
    	if node == nil {
    		return framework.AsStatus(fmt.Errorf("node not found"))
    	}
    	// 拿到 prefilter处理的s,即preFilterState
    	s, err := getPreFilterState(cycleState)
    	if err != nil {
    		return framework.AsStatus(err)
    	}
    
    	// 一个 空类型的 preFilterState是合法的,这种情况下将容忍每一个被调度的 Pod
    	if len(s.Constraints) == 0 {
    		return nil
    	}
    
    	podLabelSet := labels.Set(pod.Labels) // 设置标签
    	for _, c := range s.Constraints { // 因为拓扑约束允许多个所以
    		tpKey := c.TopologyKey
    		tpVal, ok := node.Labels[c.TopologyKey]
    		if !ok {
    			klog.V(5).InfoS("Node doesn't have required label", "node", klog.KObj(node), "label", tpKey)
    			return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch)
    		}
    
    		// 判断标准
    		// 现有的匹配数量 + 子匹配(1|0) - 全局minimum <= maxSkew
    		minMatchNum, err := s.minMatchNum(tpKey, c.MinDomains, pl.enableMinDomainsInPodTopologySpread)
    		if err != nil {
    			klog.ErrorS(err, "Internal error occurred while retrieving value precalculated in PreFilter", "topologyKey", tpKey, "paths", s.TpKeyToCriticalPaths)
    			continue
    		}
    
    		selfMatchNum := 0
    		if c.Selector.Matches(podLabelSet) {
    			selfMatchNum = 1
    		}
    
    		pair := topologyPair{key: tpKey, value: tpVal}
    		matchNum := 0
    		if tpCount, ok := s.TpPairToMatchNum[pair]; ok {
    			matchNum = tpCount
    		}
    		skew := matchNum + selfMatchNum - minMatchNum
    		if skew > int(c.MaxSkew) {
    			klog.V(5).InfoS("Node failed spreadConstraint: matchNum + selfMatchNum - minMatchNum > maxSkew", "node", klog.KObj(node), "topologyKey", tpKey, "matchNum", matchNum, "selfMatchNum", selfMatchNum, "minMatchNum", minMatchNum, "maxSkew", c.MaxSkew)
    			return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch)
    		}
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    minMatchNum

    // minMatchNum用于计算 倾斜的全局最小值,同时考虑 MinDomains。
    func (s *preFilterState) minMatchNum(tpKey string, minDomains int32, enableMinDomainsInPodTopologySpread bool) (int, error) {
    	paths, ok := s.TpKeyToCriticalPaths[tpKey]
    	if !ok {
    		return 0, fmt.Errorf("failed to retrieve path by topology key")
    	}
    	// 通常来说最小值是第一个
    	minMatchNum := paths[0].MatchNum
    	if !enableMinDomainsInPodTopologySpread { // 就是plugin的配置的 enableMinDomainsInPodTopologySpread
    		return minMatchNum, nil
    	}
    
    	domainsNum, ok := s.TpKeyToDomainsNum[tpKey]
    	if !ok {
    		return 0, fmt.Errorf("failed to retrieve the number of domains by topology key")
    	}
    
    	if domainsNum < int(minDomains) {
    		// 当有匹配拓扑键的符合条件的域的数量小于 配置的"minDomains"(每个约束条件的这个配置) 时,
    		//它将全局“minimum” 设置为0。
    		// 因为minimum默认就为1,如果他小于1,就让他为0
    		minMatchNum = 0
    	}
    
    	return minMatchNum, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    PreScore

    与 Filter 类似, PreScore 也是类似 PreFilter 的构成。 initPreScoreState 来完成过滤。

    有了 PreFilter 基础后,对于 Score 来说大同小异

    func (pl *PodTopologySpread) PreScore(
    	ctx context.Context,
    	cycleState *framework.CycleState,
    	pod *v1.Pod,
    	filteredNodes []*v1.Node,
    ) *framework.Status {
    	allNodes, err := pl.sharedLister.NodeInfos().List()
    	if err != nil {
    		return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err))
    	}
    
    	if len(filteredNodes) == 0 || len(allNodes) == 0 {
    		// No nodes to score.
    		return nil
    	}
    
    	state := &preScoreState{
    		IgnoredNodes:            sets.NewString(),
    		TopologyPairToPodCounts: make(map[topologyPair]*int64),
    	}
    	// Only require that nodes have all the topology labels if using
    	// non-system-default spreading rules. This allows nodes that don't have a
    	// zone label to still have hostname spreading.
    	// 如果使用非系统默认分布规则,则仅要求节点具有所有拓扑标签。
    	// 这将允许没有zone标签的节点仍然具有hostname分布。
    	requireAllTopologies := len(pod.Spec.TopologySpreadConstraints) > 0 || !pl.systemDefaulted
    	err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies)
    	if err != nil {
    		return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err))
    	}
    
    	// return if incoming pod doesn't have soft topology spread Constraints.
    	if len(state.Constraints) == 0 {
    		cycleState.Write(preScoreStateKey, state)
    		return nil
    	}
    
    	// Ignore parsing errors for backwards compatibility.
    	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
    	processAllNode := func(i int) {
    		nodeInfo := allNodes[i]
    		node := nodeInfo.Node()
    		if node == nil {
    			return
    		}
    
    		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
    			// `node` should satisfy incoming pod's NodeSelector/NodeAffinity
    			if match, _ := requiredNodeAffinity.Match(node); !match {
    				return
    			}
    		}
    
    		// All topologyKeys need to be present in `node`
    		if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
    			return
    		}
    
    		for _, c := range state.Constraints {
    			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
    				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
    				continue
    			}
    
    			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
    			// If current topology pair is not associated with any candidate node,
    			// continue to avoid unnecessary calculation.
    			// Per-node counts are also skipped, as they are done during Score.
    			tpCount := state.TopologyPairToPodCounts[pair]
    			if tpCount == nil {
    				continue
    			}
    			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
    			atomic.AddInt64(tpCount, int64(count))
    		}
    	}
    	pl.parallelizer.Until(ctx, len(allNodes), processAllNode)
    	// 保存状态给后面sorce调用
    	cycleState.Write(preScoreStateKey, state)
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    与Filter中Update使用的函数一样,这里也会到这一步,这里会构建出TopologySpreadConstraints,因为约束是不确定的

    func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, action v1.UnsatisfiableConstraintAction, enableMinDomainsInPodTopologySpread, enableNodeInclusionPolicyInPodTopologySpread bool) ([]topologySpreadConstraint, error) {
    	var result []topologySpreadConstraint
    	for _, c := range constraints {
    		if c.WhenUnsatisfiable == action { // 始终调度时
    			selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
    			if err != nil {
    				return nil, err
    			}
    			tsc := topologySpreadConstraint{
    				MaxSkew:            c.MaxSkew,
    				TopologyKey:        c.TopologyKey,
    				Selector:           selector,
    				MinDomains:         1,                            // If MinDomains is nil, we treat MinDomains as 1.
    				NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,  // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
    				NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
    			}
    			if enableMinDomainsInPodTopologySpread && c.MinDomains != nil {
    				tsc.MinDomains = *c.MinDomains
    			}
    			if enableNodeInclusionPolicyInPodTopologySpread {
    				if c.NodeAffinityPolicy != nil {
    					tsc.NodeAffinityPolicy = *c.NodeAffinityPolicy
    				}
    				if c.NodeTaintsPolicy != nil {
    					tsc.NodeTaintsPolicy = *c.NodeTaintsPolicy
    				}
    			}
    			result = append(result, tsc)
    		}
    	}
    	return result, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    Score

    // 在分数扩展点调用分数。该函数返回的“score”是 `nodeName` 上匹配的 pod 数量,稍后会进行归一化。
    func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
    	if err != nil {
    		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
    	}
    
    	node := nodeInfo.Node()
    	s, err := getPreScoreState(cycleState)
    	if err != nil {
    		return 0, framework.AsStatus(err)
    	}
    
    	// Return if the node is not qualified.
    	if s.IgnoredNodes.Has(node.Name) {
    		return 0, nil
    	}
    
    	// 对于每个当前的 ,当前节点获得  的信用分。
    	// 计算 总和 并将其作为该节点的分数返回。
    	var score float64
    	for i, c := range s.Constraints {
    		if tpVal, ok := node.Labels[c.TopologyKey]; ok {
    			var cnt int64
    			if c.TopologyKey == v1.LabelHostname {
    				cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
    			} else {
    				pair := topologyPair{key: c.TopologyKey, value: tpVal}
    				cnt = *s.TopologyPairToPodCounts[pair]
    			}
    			score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
    		}
    	}
    	return int64(math.Round(score)), nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Framework 中会运行 ScoreExtension ,即 NormalizeScore

    // Run NormalizeScore method for each ScorePlugin in parallel.
    f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
        pl := f.scorePlugins[index]
        nodeScoreList := pluginToNodeScores[pl.Name()]
        if pl.ScoreExtensions() == nil {
            return
        }
        status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
        if !status.IsSuccess() {
            err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
            errCh.SendErrorWithCancel(err, cancel)
            return
        }
    })
    if err := errCh.ReceiveError(); err != nil {
        return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    NormalizeScore 会为所有的node根据之前计算出的权重进行打分

    func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    	s, err := getPreScoreState(cycleState)
    	if err != nil {
    		return framework.AsStatus(err)
    	}
    	if s == nil {
    		return nil
    	}
    
    	// 计算 
    	var minScore int64 = math.MaxInt64
    	var maxScore int64
    	for i, score := range scores {
    		// it's mandatory to check if  is present in m.IgnoredNodes
    		if s.IgnoredNodes.Has(score.Name) {
    			scores[i].Score = invalidScore
    			continue
    		}
    		if score.Score < minScore {
    			minScore = score.Score
    		}
    		if score.Score > maxScore {
    			maxScore = score.Score
    		}
    	}
    
    	for i := range scores {
    		if scores[i].Score == invalidScore {
    			scores[i].Score = 0
    			continue
    		}
    		if maxScore == 0 {
    			scores[i].Score = framework.MaxNodeScore
    			continue
    		}
    		s := scores[i].Score
    		scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    到此,对于pod拓扑插件功能大概可以明了了,

    • Filter 部分(PreFilterFilter)完成拓扑对(Topology Pair)划分
    • Score部分(PreScore, Score , NormalizeScore )主要是对拓扑对(可以理解为拓扑结构划分)来选择一个最适合的pod的节点(即分数最优的节点)

    而在 scoring_test.go 给了很多用例,可以更深入的了解这部分算法

    Reference

    [1] scheduling code hierarchy

    [2] scheduler algorithm

    [3] in tree VS out of tree volume plugins

    [4] scheduler_framework_plugins

    [5] scheduling config

    [6] topology spread constraints

  • 相关阅读:
    Shiro学习笔记(四):Shiro中的授权
    紫光同创FPGA编写的8画面分割器演示
    前端Vue篇之Vuex中action和mutation的区别、Vuex 和 localStorage 的区别
    springcloudalibaba架构(29): Seata分布式事务(AT模式)
    《游戏编程模式》学习笔记(十二)子类沙箱 Subclass Sandbox
    简陋的nuxt3学习笔记
    【网络层】RIP协议详解(应用层)、慢收敛、OSPF协议(适合大网络)
    都闪开,这才是最牛x技术搜索引擎【云原生】
    链表合并 分数 25
    大数据分析与开发就业保障班
  • 原文地址:https://blog.csdn.net/sinat_24092079/article/details/126025051