• kubelet源码分析 添加 pod


    添加pod
    1触发HandlePodAdditions函数进行创建

    • 对pod根据时间戳升序排序
    • 如果是静态pod,走静态pod处理
    • 第17行,如果这个pod不是在停止中,就需要进行特殊处理。(pod是否能在node上创建的校验)
    • 第18行,流程2
    • 第19行,流程3
    • 第20行,rejectPod函数如果不能创建,则记录一下原因。
    • 通过校验后。dispatchWork触发,执行和之前的流程创建pod kubelet源码分析 删除pod(一)
    case kubetypes.ADD:
    		klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
    		handler.HandlePodAdditions(u.Pods)
    
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    	start := kl.clock.Now()
    	sort.Sort(sliceutils.PodsByCreationTime(pods))
    	for _, pod := range pods {
    		existingPods := kl.podManager.GetPods() 
    		kl.podManager.AddPod(pod)
    
    		if kubetypes.IsMirrorPod(pod) {
    			kl.handleMirrorPod(pod, start)
    			continue
    		}
    
    		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    			activePods := kl.filterOutTerminatedPods(existingPods)
    			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    				kl.rejectPod(pod, reason, message)
    				continue
    			}
    		}
    		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    		
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2.把所有pod进行校验,留下所有已经承认的pod。

    • 第4行和7行,如果pod停止完成了或者运行完成和失败的并且不是停止中。就跳过
    • 否则记录存入切片里返回。
    func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
    	filteredPods := make([]*v1.Pod, 0, len(pods))
    	for _, p := range pods {
    		if kl.podWorkers.IsPodKnownTerminated(p.UID) {
    			continue
    		}
    		if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
    			continue
    		}
    
    		filteredPods = append(filteredPods, p)
    	}
    	return filteredPods
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.校验是否能在node上创建
    第四行为函数校验,这里情况比较多,不全部贴代码,大概是验证一下pod的可创建型,有以下几种函数

    • node是否已经ready。node是否有内存、cpu、磁盘、进程压力。网络是否配置正确。
    • sysctl是否是有效的(pod的securityContext下的Sysctls)如果不修改内核,直接返回true
    • node节点上的资源是否足够、亲和性等(在做一遍部分scheduler的工作,确保安全可靠)
    • 拓扑管理
    func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
    	attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
    	for _, podAdmitHandler := range kl.admitHandlers {
    		if result := podAdmitHandler.Admit(attrs); !result.Admit {
    			return false, result.Reason, result.Message
    		}
    	}
    
    	return true, "", ""
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.校验pod的几个重要函数。这个函数主要检查node是否可以

    • 如果全部准备就绪,返回true
    • 如果是关键pod。可以根据pod的优先级是否可以创建(2000000000优先级可以直接创建),或者是静态pod也可以直接创建不用理会是否有其他压力
    • 如果是内存压力,查看一下pod的QOS级别(第15行,流程4.1),如果不是BestEffort,可以进行创建。这里可以简单介绍一下pod的级别,后面的函数也会用到这里。如果limit和request都设置了并且相等,则是Guaranteed级别,当资源紧张时,淘汰的级别也是最低的;如果limit大于request,则是Burstable级别,也就是爆发性增长的。淘汰级别第二;如果都未设置,代表不限制,则是BestEffort级别,优先淘汰。
    • 检查pod的容忍度,如果能接受有内存压力,就返回true
    • 如果是其他压力,如cpu压力,磁盘压力,网络配置问题等,就不能创建了
    pkg/kubelet/eviction/eviction_manager.go
    func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
    	m.RLock()
    	defer m.RUnlock()
    	if len(m.nodeConditions) == 0 {
    		return lifecycle.PodAdmitResult{Admit: true}
    	}
    
    	if kubelettypes.IsCriticalPod(attrs.Pod) {
    		return lifecycle.PodAdmitResult{Admit: true}
    	}
    
    	nodeOnlyHasMemoryPressureCondition := hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
    	if nodeOnlyHasMemoryPressureCondition {
    		notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
    		if notBestEffort {
    			return lifecycle.PodAdmitResult{Admit: true}
    		}
    
    		if v1helper.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
    			Key:    v1.TaintNodeMemoryPressure,
    			Effect: v1.TaintEffectNoSchedule,
    		}) {
    			return lifecycle.PodAdmitResult{Admit: true}
    		}
    	}
    
    	klog.InfoS("Failed to admit pod to node", "pod", klog.KObj(attrs.Pod), "nodeCondition", m.nodeConditions)
    	return lifecycle.PodAdmitResult{
    		Admit:   false,
    		Reason:  Reason,
    		Message: fmt.Sprintf(nodeConditionMessageFmt, m.nodeConditions),
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    4.1.获得pod的QOS等级。官方文档说是每个容器的limits和requests都需要相等才是Guaranteed,其实是判断所有容器的总和

    • 两个for循环中。遍历所有容器,把limits和requests的总和统计
    • 如果某个pod中的容器未全设置limits和requests,则不是Guaranteed
    • 如果都未设置过,则是最低等级BestEffort
    • 如果都设置了,并且总和相等,则是最高等级Guaranteed
    • 如果都设置了,但是不相等,则是中间等级Burstable
    func GetPodQOS(pod *v1.Pod) v1.PodQOSClass {
    	requests := v1.ResourceList{}
    	limits := v1.ResourceList{}
    	zeroQuantity := resource.MustParse("0")
    	isGuaranteed := true
    	allContainers := []v1.Container{}
    	//init容器和普通容器汇总到一起
    	allContainers = append(allContainers, pod.Spec.Containers...)
    	allContainers = append(allContainers, pod.Spec.InitContainers...)
    	for _, container := range allContainers {		
    		for name, quantity := range container.Resources.Requests {
    		    //只验证request和limit字段的
    			if !isSupportedQoSComputeResource(name) {
    				continue
    			}
    			//相等,返回0。大于返回1,这里为了验证大于0
    			if quantity.Cmp(zeroQuantity) == 1 {
    			    //值拷贝出来
    				delta := quantity.DeepCopy()
    				//如果这个name(cpu/memory)不存在,则创建
    				if _, exists := requests[name]; !exists {
    					requests[name] = delta
    				} else {
    				//如果已经存在这个namecpu/memory)。则把原有的放入delta中,在重新赋值。
    					delta.Add(requests[name])
    					requests[name] = delta
    				}
    			}
    		}
    
    		qosLimitsFound := sets.NewString()
    		for name, quantity := range container.Resources.Limits {
    			if !isSupportedQoSComputeResource(name) {
    				continue
    			}
    			if quantity.Cmp(zeroQuantity) == 1 {
    				qosLimitsFound.Insert(string(name))
    				delta := quantity.DeepCopy()
    				if _, exists := limits[name]; !exists {
    					limits[name] = delta
    				} else {
    					delta.Add(limits[name])
    					limits[name] = delta
    				}
    			}
    		}
    		//判断一下limits的是否cpu和memory全部包括了.如果缺一个,则不是Guaranteed
    		if !qosLimitsFound.HasAll(string(v1.ResourceMemory), string(v1.ResourceCPU)) {
    			isGuaranteed = false
    		}
    	}
    	//如果request和limits都未设置,则是最差的类型
    	if len(requests) == 0 && len(limits) == 0 {
    		return v1.PodQOSBestEffort
    	}
    
    	if isGuaranteed {
    		for name, req := range requests {
    	        //如果requests设置的cpu、mem和limits设置的值不一样,则是Burstable
    			if lim, exists := limits[name]; !exists || lim.Cmp(req) != 0 {
    				isGuaranteed = false
    				break
    			}
    		}
    	}
    	if isGuaranteed &&
    		len(requests) == len(limits) {
    		return v1.PodQOSGuaranteed
    	}
    	return v1.PodQOSBurstable
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    5.验证sysctls。有的容器需要修改系统内核,这里需要对准许修改的内核做校验

    • 如果没有修改sysctls,则直接返回正确
    • 如果修改了sysctls,则验证明明空。有许多的 sysctl 参数都是有命名空间的,如果有命名空间是ipc或者Network的而且还用了宿主机的hostipc或hostNet,则返回错误
    func (w *patternAllowlist) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
    	pod := attrs.Pod
    
    	if pod.Spec.SecurityContext == nil || len(pod.Spec.SecurityContext.Sysctls) == 0 {
    		return lifecycle.PodAdmitResult{
    			Admit: true,
    		}
    	}
    
    	var hostNet, hostIPC bool
    	if pod.Spec.SecurityContext != nil {
    		hostNet = pod.Spec.HostNetwork
    		hostIPC = pod.Spec.HostIPC
    	}
    	for _, s := range pod.Spec.SecurityContext.Sysctls {
    		if err := w.validateSysctl(s.Name, hostNet, hostIPC); err != nil {
    			return lifecycle.PodAdmitResult{
    				Admit:   false,
    				Reason:  ForbiddenReason,
    				Message: fmt.Sprintf("forbidden sysctl: %v", err),
    			}
    		}
    	}
    
    	return lifecycle.PodAdmitResult{
    		Admit: true,
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    6.验证资源是否足够

    • 第2行,获得node的信息,如果未获得,初始化一下,都失败的话,就返回node错误了
    • 初始化一下node信息(保证数据最新最可靠),存入所有承认的pod,加入pod的亲和性和资源(为了后面判断新pod的资源请求是否足够)
    • 更新插件设备管理(流程6.2)
    • 验证是否有node不符合的pod扩展请求(requests下),排出去(流程6.3)
    • 验证pod资源是否都没问题。污点、容忍度是否有问题(流程6.4)
    • 如果是静态pod,并且是资源不足的问题,则可以继续创建(流程6.9)
    • 如果都匹配,验证pod标签是否传了os系统并且是否匹配,验证pod.spec.OS是否有,如果传了是否等于GOOS
    • 都没问题,返回true
    func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
    	node, err := w.getNodeAnyWayFunc()
    	if err != nil {
    		klog.ErrorS(err, "Cannot get Node info")
    		return PodAdmitResult{
    			Admit:   false,
    			Reason:  "InvalidNodeInfo",
    			Message: "Kubelet cannot get node info.",
    		}
    	}
    	admitPod := attrs.Pod  //要添加的pod
    	pods := attrs.OtherPods //所有占用资源的pod
    	nodeInfo := schedulerframework.NewNodeInfo(pods...)  //把pod信息存入到nodeinfo里(流程6.1)
    	nodeInfo.SetNode(node) //把运行中的node和node的可用资源复制给nodeInfo
    	if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil { //更新插件设备管理。这里目前肯定会返回nil
    		message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
    		klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
    		return PodAdmitResult{
    			Admit:   false,
    			Reason:  "UnexpectedAdmissionError",
    			Message: message,
    		}
    	}
    	podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)//验证node里不符合的pod的扩展请求(requests下的)
    
    	reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)//验证pod资源是否都没问题(scheduler的函数验证)
    	fit := len(reasons) == 0
    	if !fit {//资源有问题,例cpu超出
    		reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)//在筛选一下,如果不是静态pod就算了,如果是静态pod需要额外的校验可以先通过(流程6.9)
    		fit = len(reasons) == 0 && err == nil
    		if err != nil {
    			message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
    			klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
    			return PodAdmitResult{
    				Admit:   fit,
    				Reason:  "UnexpectedAdmissionError",
    				Message: message,
    			}
    		}
    	}
    	if !fit {
    		var reason string
    		var message string
    		if len(reasons) == 0 {
    			message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
    			klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
    			return PodAdmitResult{
    				Admit:   fit,
    				Reason:  "UnknownReason",
    				Message: message,
    			}
    		}
    		// If there are failed predicates, we only return the first one as a reason.
    		r := reasons[0]
    		switch re := r.(type) {//错误的类型
    		case *PredicateFailureError: //资源以外的类型(亲和性,name,端口,污点等)
    			reason = re.PredicateName
    			message = re.Error()
    			klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
    		case *InsufficientResourceError: //资源类型错误,如cpu不足,
    			reason = fmt.Sprintf("OutOf%s", re.ResourceName)
    			message = re.Error()
    			klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
    		default:
    			reason = "UnexpectedPredicateFailureType"
    			message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
    			klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
    		}
    		return PodAdmitResult{ //返回错误信息及其类型
    			Admit:   fit,
    			Reason:  reason,
    			Message: message,
    		}
    	}
    	if rejectPodAdmissionBasedOnOSSelector(admitPod, node) {//验证pod标签是否传了对系统的定义,如果有则必须和node的一样
    		return PodAdmitResult{
    			Admit:   false,
    			Reason:  "PodOSSelectorNodeLabelDoesNotMatch",
    			Message: "Failed to admit pod as the `kubernetes.io/os` label doesn't match node label",
    		}
    	}
    	if rejectPodAdmissionBasedOnOSField(admitPod) {//验证pod.spec.OS
    		return PodAdmitResult{
    			Admit:   false,
    			Reason:  "PodOSNotSupported",
    			Message: "Failed to admit pod as the OS field doesn't match node OS",
    		}
    	}
    	return PodAdmitResult{
    		Admit: true,
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    6.1.加入资源和亲和性

    • 亲和性的跳过了。比较简单但是乱。就是把pod的亲和性组装一下给node,没什么可谈的
    • 这里直接跳到了资源上,统计一下pod的资源占比。所有容器资源相加。然后和init的需要的比较,取max最大的就是最终资源。如果是非0情况下,cpu是0.1核,内存是200M。这里还算计算pod的开销Overhead。会得到所有容器消耗后加上overhead的消耗
    • 把pod的信息都存入到node中。
    • 如果有亲和性,加入到node
    • 如果pod占用了端口,把端口信息加入到node
    • 如果使用了PVC,对PVC的使用量+1
    func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
    	res, non0CPU, non0Mem := calculateResource(podInfo.Pod)//res为总数据,non0CPU为非0的资源占比
    	n.Requested.MilliCPU += res.MilliCPU
    	n.Requested.Memory += res.Memory
    	n.Requested.EphemeralStorage += res.EphemeralStorage
    	if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
    		n.Requested.ScalarResources = map[v1.ResourceName]int64{}
    	}
    	for rName, rQuant := range res.ScalarResources {
    		n.Requested.ScalarResources[rName] += rQuant
    	}
    	n.NonZeroRequested.MilliCPU += non0CPU
    	n.NonZeroRequested.Memory += non0Mem
    	n.Pods = append(n.Pods, podInfo)
    	if podWithAffinity(podInfo.Pod) {
    		n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
    	}
    	if podWithRequiredAntiAffinity(podInfo.Pod) {
    		n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
    	}
    
    	n.updateUsedPorts(podInfo.Pod, true)
    	n.updatePVCRefCounts(podInfo.Pod, true)
    
    	n.Generation = nextGeneration()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    6.2更新node的插件管理

    • 查看这个pod是否使用到了插件,没使用的直接返回不需要更新了
    • 获得node总共的可分配插件资源然后根据使用情况遍历,查看这个插件资源一共有几个,并且使用了几个
    • 如果使用的数量大于可分配的,说明node的信息不准确了
    • 将node的可分配个数改为已经分配的个数保持一致
    func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
    	pod := attrs.Pod
    	if !m.podDevices.hasPod(string(pod.UID)) {
    		return nil
    	}
    
    	m.sanitizeNodeAllocatable(node)
    	return nil
    }
    
    
    func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) {
    	var newAllocatableResource *schedulerframework.Resource
    	allocatableResource := node.Allocatable  //获得node总共的可分配插件资源
    	if allocatableResource.ScalarResources == nil {
    		allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
    	}
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    	for resource, devices := range m.allocatedDevices { //遍历已经使用的资源
    		needed := devices.Len() //使用的个数
    		quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] //查看这个插件资源一共有几个
    		if ok && int(quant) >= needed { //如果有的大于或者等于使用的,不必管。否则就要更新一下node的插件资源个数了
    			continue
    		}
    		if newAllocatableResource == nil {
    			newAllocatableResource = allocatableResource.Clone()//拷贝一下
    		}
    		newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)//拷贝出来的插件资源个数,等于已经使用的个数。
    	}
    	if newAllocatableResource != nil {
    		node.Allocatable = newAllocatableResource//赋值
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    6.3过滤一下node不接受的资源请求

    • 遍历一下pod的requests资源,验证资源的name(不需要验证limits,因为pod运行时候不会使用limits)
    • 如果使用的资源是系统的命名空间(kubernetes.io/)或者以request. 为前缀,则不允许(因为最终的命名会以request.前缀开始)
    • 如果使用的扩展插件资源在node中不存在,也要排除
    • 最终命名后验证是否合法,是否是限定名字
    func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) *v1.Pod {
    	podCopy := pod.DeepCopy()
    	for i, c := range pod.Spec.Containers {
    		podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
    		for rName, rQuant := range c.Resources.Requests {
    			if v1helper.IsExtendedResourceName(rName) {
    				if _, found := nodeInfo.Allocatable.ScalarResources[rName]; !found {
    					continue
    				}
    			}
    			podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
    		}
    	}
    	return podCopy
    }
    
    func IsExtendedResourceName(name v1.ResourceName) bool {
    	if IsNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
    		return false
    	}
    	nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
    	if errs := validation.IsQualifiedName(string(nameForQuota)); len(errs) != 0 {
    		return false
    	}
    	return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    6.4校验资源是否足够

    • 使用scheduler函数验证node是否可以创建这个pod(流程6.5)
    • 如果不是静态pod,验证容忍度(流程6.8)
    • 如果都通过,则代表node上可以创建这个pod
    func generalFilter(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) []PredicateFailureReason {
    	admissionResults := scheduler.AdmissionCheck(pod, nodeInfo, true) //验证资源是否足够
    	var reasons []PredicateFailureReason
    	for _, r := range admissionResults {//如果不为nil,则代表有错误
    		if r.InsufficientResource != nil {//如果是资源不足(和面静态pod会用到这个判断)
    			reasons = append(reasons, &InsufficientResourceError{
    				ResourceName: r.InsufficientResource.ResourceName,
    				Requested:    r.InsufficientResource.Requested,
    				Used:         r.InsufficientResource.Used,
    				Capacity:     r.InsufficientResource.Capacity,
    			})
    		} else {//如果是其他错误(端口,名字,亲和性)
    			reasons = append(reasons, &PredicateFailureError{r.Name, r.Reason})
    		}
    	}
    
    	if !types.IsStaticPod(pod) {//不是静态pod,需要验证容忍度
    		_, isUntolerated := corev1.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
    			return t.Effect == v1.TaintEffectNoExecute
    		})
    		if isUntolerated {
    			reasons = append(reasons, &PredicateFailureError{tainttoleration.Name, tainttoleration.ErrReasonNotMatch})
    		}
    	}
    
    	return reasons
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    6.5 验证node是否可以创建这个pod

    • 验证node的资源是否足够(流程6.6)
    • 验证亲和性(流程6.7)
    • 验证node的名字(pod名字为空或者和node同名则不被允许)
    • 验证node的端口(portt是否被使用了)
    func AdmissionCheck(pod *v1.Pod, nodeInfo *framework.NodeInfo, includeAllFailures bool) []AdmissionResult {
    	var admissionResults []AdmissionResult
    	insufficientResources := noderesources.Fits(pod, nodeInfo)
    	if len(insufficientResources) != 0 {
    		for i := range insufficientResources {
    			admissionResults = append(admissionResults, AdmissionResult{InsufficientResource: &insufficientResources[i]})
    		}
    		if !includeAllFailures {
    			return admissionResults
    		}
    	}
    
    	if matches, _ := corev1nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node()); !matches {
    		admissionResults = append(admissionResults, AdmissionResult{Name: nodeaffinity.Name, Reason: nodeaffinity.ErrReasonPod})
    		if !includeAllFailures {
    			return admissionResults
    		}
    	}
    	if !nodename.Fits(pod, nodeInfo) {
    		admissionResults = append(admissionResults, AdmissionResult{Name: nodename.Name, Reason: nodename.ErrReason})
    		if !includeAllFailures {
    			return admissionResults
    		}
    	}
    	if !nodeports.Fits(pod, nodeInfo) {
    		admissionResults = append(admissionResults, AdmissionResult{Name: nodeports.Name, Reason: nodeports.ErrReason})
    		if !includeAllFailures {
    			return admissionResults
    		}
    	}
    	return admissionResults
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    6.6 验证资源是否足够

    • 验证node的最大pod数是否超过
    • 如果没有任何限制,都是0,直接返回可创建
    • 验证内存和cpu和存储空间是否够用
    • 验证一下扩展插件,如果这个扩展插件是被忽略验证的(根据前缀或者全名)则跳过
    func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
    	insufficientResources := make([]InsufficientResource, 0, 4)
    
    	allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
    	if len(nodeInfo.Pods)+1 > allowedPodNumber {//验证node的最大pod数是否超过
    		insufficientResources = append(insufficientResources, InsufficientResource{
    			ResourceName: v1.ResourcePods,
    			Reason:       "Too many pods",
    			Requested:    1,
    			Used:         int64(len(nodeInfo.Pods)),
    			Capacity:     int64(allowedPodNumber),
    		})
    	}
         //如果没有任何限制,都是0,直接返回可创建
    	if podRequest.MilliCPU == 0 &&
    		podRequest.Memory == 0 &&
    		podRequest.EphemeralStorage == 0 &&
    		len(podRequest.ScalarResources) == 0 {
    		return insufficientResources
    	}
        //验证cpu是否足够
    	if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
    		insufficientResources = append(insufficientResources, InsufficientResource{
    			ResourceName: v1.ResourceCPU,
    			Reason:       "Insufficient cpu",
    			Requested:    podRequest.MilliCPU,
    			Used:         nodeInfo.Requested.MilliCPU,
    			Capacity:     nodeInfo.Allocatable.MilliCPU,
    		})
    	}
    	 //验证内存是否足够
    	if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
    		insufficientResources = append(insufficientResources, InsufficientResource{
    			ResourceName: v1.ResourceMemory,
    			Reason:       "Insufficient memory",
    			Requested:    podRequest.Memory,
    			Used:         nodeInfo.Requested.Memory,
    			Capacity:     nodeInfo.Allocatable.Memory,
    		})
    	}
    	//验证存储空间是否足够
    	if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - nodeInfo.Requested.EphemeralStorage) {
    		insufficientResources = append(insufficientResources, InsufficientResource{
    			ResourceName: v1.ResourceEphemeralStorage,
    			Reason:       "Insufficient ephemeral-storage",
    			Requested:    podRequest.EphemeralStorage,
    			Used:         nodeInfo.Requested.EphemeralStorage,
    			Capacity:     nodeInfo.Allocatable.EphemeralStorage,
    		})
    	}
    
    	for rName, rQuant := range podRequest.ScalarResources {
    	//如果是被忽略的扩展资源之一,跳过
    		if v1helper.IsExtendedResourceName(rName) {
    			var rNamePrefix string
    			if ignoredResourceGroups.Len() > 0 {
    				rNamePrefix = strings.Split(string(rName), "/")[0]
    			}
    			if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
    				continue
    			}
    		}
    		//验证扩展插件是否足够
    		if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
    			insufficientResources = append(insufficientResources, InsufficientResource{
    				ResourceName: rName,
    				Reason:       fmt.Sprintf("Insufficient %v", rName),
    				Requested:    podRequest.ScalarResources[rName],
    				Used:         nodeInfo.Requested.ScalarResources[rName],
    				Capacity:     nodeInfo.Allocatable.ScalarResources[rName],
    			})
    		}
    	}
    
    	return insufficientResources
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    6.7 验证亲和性(验证标签逻辑简单且乱,就是key和val的对比。不多介绍)

    • 把标签选择器和亲和亲和性中Required的记录。(不会记录preferred,因为当前kubelet不知道其他的node节点,这是scheduler已经分配过来的,只需要验证必须得即可)
    • 如果节点选择器已经有了,则不会校验亲和性
    • 节点选择器为选择,校验亲和性
    func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
    	var selector labels.Selector
    	if len(pod.Spec.NodeSelector) > 0 { //标签选择器
    		selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
    	}
    	//亲和性必须性炎症
    	var affinity *LazyErrorNodeSelector
    	if pod.Spec.Affinity != nil &&
    		pod.Spec.Affinity.NodeAffinity != nil &&
    		pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
    		affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
    	}
    	return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
    }
    
    
    func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {
    	if s.labelSelector != nil {//校验节点选择器
    		if !s.labelSelector.Matches(labels.Set(node.Labels)) {
    			return false, nil
    		}
    	}
    	if s.nodeSelector != nil {//校验亲和性
    		return s.nodeSelector.Match(node)
    	}
    	return true, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    6.8 如果不是静态pod。验证容忍度

    • 只判断NoExecute类型的条件就可以了
    • 遍历NoExecute类型的条件。
    • 如果pod的NoExecute和node的不符,则代表污点不通过
    func FindMatchingUntoleratedTaint(taints []v1.Taint, tolerations []v1.Toleration, inclusionFilter taintsFilterFunc) (v1.Taint, bool) {
    	filteredTaints := getFilteredTaints(taints, inclusionFilter)//拿到NoExecute的条件
    	for _, taint := range filteredTaints {
    		if !TolerationsTolerateTaint(tolerations, &taint) {//如果Effect或key或val不符合,则代表不通过
    			return taint, true
    		}
    	}
    	return v1.Taint{}, false
    }
    
    func getFilteredTaints(taints []v1.Taint, inclusionFilter taintsFilterFunc) []v1.Taint {
    	if inclusionFilter == nil {
    		return taints
    	}
    	filteredTaints := []v1.Taint{}
    	for _, taint := range taints {
    		if !inclusionFilter(&taint) {
    			continue
    		}
    		filteredTaints = append(filteredTaints, taint) //只返回NoExecute类型的条件
    	}
    	return filteredTaints
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    6.9如果是静态pod,特殊处理一下

    • 如果不是静态pod或者pod优先级低于20亿。返回传过来的错误原因
    • 遍历错误原因,如果是因为资源不足的话,记录一下。
    • 如果不是资源不足的,记录到新的原因里,直接返回
    • 如果是资源不足,而且又是静态pod,则要驱逐其他pod来让这个静态pod建立了(流程6.10)
    func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
    	if !kubetypes.IsCriticalPod(admitPod) {
    		return failureReasons, nil
    	}
    	nonResourceReasons := []lifecycle.PredicateFailureReason{}
    	resourceReasons := []*admissionRequirement{}
    	for _, reason := range failureReasons {
    		if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
    			resourceReasons = append(resourceReasons, &admissionRequirement{//验证一下是不是资源不足引起的
    				resourceName: r.ResourceName,
    				quantity:     r.GetInsufficientAmount(),
    			})
    		} else {
    			nonResourceReasons = append(nonResourceReasons, reason)
    		}
    	}
    	if len(nonResourceReasons) > 0 {
    		return nonResourceReasons, nil
    	}
    	err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))//驱逐一个其他的pod(流程6。10)
    	return nil, err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    6.10 驱逐pod

    • 第三行的代码在最后,大概意思是根据QOS对pod进行三个切片的分组,分组后根据当前静态pod缺少的资源是多少,驱逐QOS等级最低的pod时资源是否够用。如果全都不够用,则直接返回不进行驱逐,如果有一个可以驱逐后让当前pod建立,则继续。同时,只返回QOS等级最低的分组。如果最低分组没有可以驱逐的,则逐级排查
    • 一些找驱逐pod的算法(流程6.11)
    • 记录event,同时删除这个pod
    func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
    	podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources)//获得可以驱逐的pod(按QOS等级,如果bestEffort没有满足条件的,则是burstable)
    	if err != nil {
    		return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
    	}
    	for _, pod := range podsToPreempt {//遍历可以驱逐的pod
    		c.recorder.Eventf(pod, v1.EventTypeWarning, events.PreemptContainer, message)
    		klog.V(3).InfoS("Preempting pod to free up resources", "pod", klog.KObj(pod), "podUID", pod.UID, "insufficientResources", insufficientResources)
    		err := c.killPodFunc(pod, true, nil, func(status *v1.PodStatus) {//调动函数删除pod
    			status.Phase = v1.PodFailed
    			status.Reason = events.PreemptContainer
    			status.Message = message
    		})
    		if err != nil {
    			klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod))
    
    			continue
    		}
    		//发起的pod抢占的累积数量
    		if len(insufficientResources) > 0 {
    			metrics.Preemptions.WithLabelValues(insufficientResources[0].resourceName.String()).Inc()
    		} else {
    			metrics.Preemptions.WithLabelValues("").Inc()
    		}
    		klog.InfoS("Pod evicted successfully", "pod", klog.KObj(pod))
    	}
    	return nil
    }
    
    
    //获得可以驱逐的pod
    func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
    	bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)//根据QOS分级
    
    	unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)//所有pod中,是否没有能满足只驱逐一个pod就满足当前静态pod创建成功的
    	if len(unableToMeetRequirements) > 0 {
    		return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
    	}
        //根据三个等级分类,只返回最低等级的,如果最低等级bestEffort没有满足的,则后的burstable等级,在没有,获得guaranteed。同时获得分组中最优的驱逐pod的算法(流程6.11)
    	guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))//如果最低的两个等级驱逐没有能满足要求的,则驱逐guaranteed级别的。
    	if err != nil {
    		return nil, err
    	}
    	burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
    	if err != nil {
    		return nil, err
    	}
    
    	bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
    	if err != nil {
    		return nil, err
    	}
    	return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    6.11 寻找最佳的驱逐pod的算法

    • requirements是未满足的条件数(比如,cpu未满足,内存未满足,则requirements=2)
    • bestDistance设置一个+1值做比较。bestPodIndex代表最适合的pod的index值
    • 遍历所有pod,获得最优值,如果最优值大于bestDistance(越小越好)。则代表这个pod驱逐不能满足条件。如果小于,则代表可以,更新bestDistance为这个值,bestPodIndex更新这个pod。循环到下一个pod的时候,如果最优值相等了,则验证一下这个pod和之前的bestPodIndex的pod比较,哪个用的资源最少使用哪个。
    • 全部遍历完,用最优值的pod验证一下这一个pod是否能满足所有未满足的条件
    • 如果全部满足了,则将requirements更新为0,这样就不会再循环了。然后返回这个pod去驱逐
    • 如果未能全部满足,将这个最优pod存起来,同时从pods数组中移除这个pod,再次遍历找最优的pod。
    func getPodsToPreemptByDistance(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
    	podsToEvict := []*v1.Pod{}
    
    	for len(requirements) > 0 {//未满足的条件
    		if len(pods) == 0 {
    			return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", requirements.toString())
    		}
    
    		bestDistance := float64(len(requirements) + 1)//比较值
    		bestPodIndex := 0 //最优pod的索引值index
    
    		for i, pod := range pods {//遍历所有pod
    			dist := requirements.distance(pod)//获得最优值
    			if dist < bestDistance || (bestDistance == dist && smallerResourceRequest(pod, pods[bestPodIndex])) {//如果最优值小于比较值,则代表这个pod可以,如果相同,咋判断两个pod哪个申请的资源最少
    				bestDistance = dist//更新比较值
    				bestPodIndex = i//最优pod的索引值index
    			}
    		}
    		requirements = requirements.subtract(pods[bestPodIndex])//用这个最优的pod去验证能否满足所有条件。如果都可以满足,就不需要再次循环了
    		podsToEvict = append(podsToEvict, pods[bestPodIndex])//把这个最优pod存入要驱逐pod的数组中。
    		//这个是为了一个pod不满足情况下
    		pods[bestPodIndex] = pods[len(pods)-1]//删除这个最优pod吗,用pods列表中的最够一个pod替代(因为已经存入到驱逐pod中了)
    		pods = pods[:len(pods)-1]//删除最后一个pod
    	}
    	return podsToEvict, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    7.上面的验证都完事了,如果有不符合的返回错误结束。如果没有问题就走到dispatchWork触发了,执行和这个流程一样的步骤去创建podkubelet源码分析 删除pod(一)

  • 相关阅读:
    面试 - react - Hooks
    基于mqtt的物联网控制移动应用程序开发
    2020 年 Java 面试题集锦(金九银十)
    Intel关NUMA的内存编址
    护眼灯显色度越高越好吗?显色指数最舒适的护眼台灯推荐
    4_使用预训练模型 微调训练CIFAR10
    4. algorithm
    分布式机器学习:同步并行SGD算法的实现与复杂度分析
    分布式锁三种方案
    给Python漫画分集标题下载工具开发Qt界面
  • 原文地址:https://blog.csdn.net/qq_35679620/article/details/128135130