添加pod
1触发HandlePodAdditions函数进行创建
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodAdditions(u.Pods)
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
activePods := kl.filterOutTerminatedPods(existingPods)
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
2.把所有pod进行校验,留下所有已经承认的pod。
func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
filteredPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
if kl.podWorkers.IsPodKnownTerminated(p.UID) {
continue
}
if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
continue
}
filteredPods = append(filteredPods, p)
}
return filteredPods
}
3.校验是否能在node上创建
第四行为函数校验,这里情况比较多,不全部贴代码,大概是验证一下pod的可创建型,有以下几种函数
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
return true, "", ""
}
4.校验pod的几个重要函数。这个函数主要检查node是否可以
pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
m.RLock()
defer m.RUnlock()
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
if kubelettypes.IsCriticalPod(attrs.Pod) {
return lifecycle.PodAdmitResult{Admit: true}
}
nodeOnlyHasMemoryPressureCondition := hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
if nodeOnlyHasMemoryPressureCondition {
notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
if v1helper.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule,
}) {
return lifecycle.PodAdmitResult{Admit: true}
}
}
klog.InfoS("Failed to admit pod to node", "pod", klog.KObj(attrs.Pod), "nodeCondition", m.nodeConditions)
return lifecycle.PodAdmitResult{
Admit: false,
Reason: Reason,
Message: fmt.Sprintf(nodeConditionMessageFmt, m.nodeConditions),
}
}
4.1.获得pod的QOS等级。官方文档说是每个容器的limits和requests都需要相等才是Guaranteed,其实是判断所有容器的总和
func GetPodQOS(pod *v1.Pod) v1.PodQOSClass {
requests := v1.ResourceList{}
limits := v1.ResourceList{}
zeroQuantity := resource.MustParse("0")
isGuaranteed := true
allContainers := []v1.Container{}
//init容器和普通容器汇总到一起
allContainers = append(allContainers, pod.Spec.Containers...)
allContainers = append(allContainers, pod.Spec.InitContainers...)
for _, container := range allContainers {
for name, quantity := range container.Resources.Requests {
//只验证request和limit字段的
if !isSupportedQoSComputeResource(name) {
continue
}
//相等,返回0。大于返回1,这里为了验证大于0
if quantity.Cmp(zeroQuantity) == 1 {
//值拷贝出来
delta := quantity.DeepCopy()
//如果这个name(cpu/memory)不存在,则创建
if _, exists := requests[name]; !exists {
requests[name] = delta
} else {
//如果已经存在这个namecpu/memory)。则把原有的放入delta中,在重新赋值。
delta.Add(requests[name])
requests[name] = delta
}
}
}
qosLimitsFound := sets.NewString()
for name, quantity := range container.Resources.Limits {
if !isSupportedQoSComputeResource(name) {
continue
}
if quantity.Cmp(zeroQuantity) == 1 {
qosLimitsFound.Insert(string(name))
delta := quantity.DeepCopy()
if _, exists := limits[name]; !exists {
limits[name] = delta
} else {
delta.Add(limits[name])
limits[name] = delta
}
}
}
//判断一下limits的是否cpu和memory全部包括了.如果缺一个,则不是Guaranteed
if !qosLimitsFound.HasAll(string(v1.ResourceMemory), string(v1.ResourceCPU)) {
isGuaranteed = false
}
}
//如果request和limits都未设置,则是最差的类型
if len(requests) == 0 && len(limits) == 0 {
return v1.PodQOSBestEffort
}
if isGuaranteed {
for name, req := range requests {
//如果requests设置的cpu、mem和limits设置的值不一样,则是Burstable
if lim, exists := limits[name]; !exists || lim.Cmp(req) != 0 {
isGuaranteed = false
break
}
}
}
if isGuaranteed &&
len(requests) == len(limits) {
return v1.PodQOSGuaranteed
}
return v1.PodQOSBurstable
}
5.验证sysctls。有的容器需要修改系统内核,这里需要对准许修改的内核做校验
func (w *patternAllowlist) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
if pod.Spec.SecurityContext == nil || len(pod.Spec.SecurityContext.Sysctls) == 0 {
return lifecycle.PodAdmitResult{
Admit: true,
}
}
var hostNet, hostIPC bool
if pod.Spec.SecurityContext != nil {
hostNet = pod.Spec.HostNetwork
hostIPC = pod.Spec.HostIPC
}
for _, s := range pod.Spec.SecurityContext.Sysctls {
if err := w.validateSysctl(s.Name, hostNet, hostIPC); err != nil {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: ForbiddenReason,
Message: fmt.Sprintf("forbidden sysctl: %v", err),
}
}
}
return lifecycle.PodAdmitResult{
Admit: true,
}
}
6.验证资源是否足够
func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
node, err := w.getNodeAnyWayFunc()
if err != nil {
klog.ErrorS(err, "Cannot get Node info")
return PodAdmitResult{
Admit: false,
Reason: "InvalidNodeInfo",
Message: "Kubelet cannot get node info.",
}
}
admitPod := attrs.Pod //要添加的pod
pods := attrs.OtherPods //所有占用资源的pod
nodeInfo := schedulerframework.NewNodeInfo(pods...) //把pod信息存入到nodeinfo里(流程6.1)
nodeInfo.SetNode(node) //把运行中的node和node的可用资源复制给nodeInfo
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil { //更新插件设备管理。这里目前肯定会返回nil
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)//验证node里不符合的pod的扩展请求(requests下的)
reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)//验证pod资源是否都没问题(scheduler的函数验证)
fit := len(reasons) == 0
if !fit {//资源有问题,例cpu超出
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)//在筛选一下,如果不是静态pod就算了,如果是静态pod需要额外的校验可以先通过(流程6.9)
fit = len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
var reason string
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
return PodAdmitResult{
Admit: fit,
Reason: "UnknownReason",
Message: message,
}
}
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {//错误的类型
case *PredicateFailureError: //资源以外的类型(亲和性,name,端口,污点等)
reason = re.PredicateName
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
case *InsufficientResourceError: //资源类型错误,如cpu不足,
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
default:
reason = "UnexpectedPredicateFailureType"
message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
}
return PodAdmitResult{ //返回错误信息及其类型
Admit: fit,
Reason: reason,
Message: message,
}
}
if rejectPodAdmissionBasedOnOSSelector(admitPod, node) {//验证pod标签是否传了对系统的定义,如果有则必须和node的一样
return PodAdmitResult{
Admit: false,
Reason: "PodOSSelectorNodeLabelDoesNotMatch",
Message: "Failed to admit pod as the `kubernetes.io/os` label doesn't match node label",
}
}
if rejectPodAdmissionBasedOnOSField(admitPod) {//验证pod.spec.OS
return PodAdmitResult{
Admit: false,
Reason: "PodOSNotSupported",
Message: "Failed to admit pod as the OS field doesn't match node OS",
}
}
return PodAdmitResult{
Admit: true,
}
}
6.1.加入资源和亲和性
func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
res, non0CPU, non0Mem := calculateResource(podInfo.Pod)//res为总数据,non0CPU为非0的资源占比
n.Requested.MilliCPU += res.MilliCPU
n.Requested.Memory += res.Memory
n.Requested.EphemeralStorage += res.EphemeralStorage
if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
n.Requested.ScalarResources = map[v1.ResourceName]int64{}
}
for rName, rQuant := range res.ScalarResources {
n.Requested.ScalarResources[rName] += rQuant
}
n.NonZeroRequested.MilliCPU += non0CPU
n.NonZeroRequested.Memory += non0Mem
n.Pods = append(n.Pods, podInfo)
if podWithAffinity(podInfo.Pod) {
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
}
if podWithRequiredAntiAffinity(podInfo.Pod) {
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
}
n.updateUsedPorts(podInfo.Pod, true)
n.updatePVCRefCounts(podInfo.Pod, true)
n.Generation = nextGeneration()
}
6.2更新node的插件管理
func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
if !m.podDevices.hasPod(string(pod.UID)) {
return nil
}
m.sanitizeNodeAllocatable(node)
return nil
}
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) {
var newAllocatableResource *schedulerframework.Resource
allocatableResource := node.Allocatable //获得node总共的可分配插件资源
if allocatableResource.ScalarResources == nil {
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
}
m.mutex.Lock()
defer m.mutex.Unlock()
for resource, devices := range m.allocatedDevices { //遍历已经使用的资源
needed := devices.Len() //使用的个数
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] //查看这个插件资源一共有几个
if ok && int(quant) >= needed { //如果有的大于或者等于使用的,不必管。否则就要更新一下node的插件资源个数了
continue
}
if newAllocatableResource == nil {
newAllocatableResource = allocatableResource.Clone()//拷贝一下
}
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)//拷贝出来的插件资源个数,等于已经使用的个数。
}
if newAllocatableResource != nil {
node.Allocatable = newAllocatableResource//赋值
}
}
6.3过滤一下node不接受的资源请求
func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) *v1.Pod {
podCopy := pod.DeepCopy()
for i, c := range pod.Spec.Containers {
podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
for rName, rQuant := range c.Resources.Requests {
if v1helper.IsExtendedResourceName(rName) {
if _, found := nodeInfo.Allocatable.ScalarResources[rName]; !found {
continue
}
}
podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
}
}
return podCopy
}
func IsExtendedResourceName(name v1.ResourceName) bool {
if IsNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
return false
}
nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
if errs := validation.IsQualifiedName(string(nameForQuota)); len(errs) != 0 {
return false
}
return true
}
6.4校验资源是否足够
func generalFilter(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) []PredicateFailureReason {
admissionResults := scheduler.AdmissionCheck(pod, nodeInfo, true) //验证资源是否足够
var reasons []PredicateFailureReason
for _, r := range admissionResults {//如果不为nil,则代表有错误
if r.InsufficientResource != nil {//如果是资源不足(和面静态pod会用到这个判断)
reasons = append(reasons, &InsufficientResourceError{
ResourceName: r.InsufficientResource.ResourceName,
Requested: r.InsufficientResource.Requested,
Used: r.InsufficientResource.Used,
Capacity: r.InsufficientResource.Capacity,
})
} else {//如果是其他错误(端口,名字,亲和性)
reasons = append(reasons, &PredicateFailureError{r.Name, r.Reason})
}
}
if !types.IsStaticPod(pod) {//不是静态pod,需要验证容忍度
_, isUntolerated := corev1.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
if isUntolerated {
reasons = append(reasons, &PredicateFailureError{tainttoleration.Name, tainttoleration.ErrReasonNotMatch})
}
}
return reasons
}
6.5 验证node是否可以创建这个pod
func AdmissionCheck(pod *v1.Pod, nodeInfo *framework.NodeInfo, includeAllFailures bool) []AdmissionResult {
var admissionResults []AdmissionResult
insufficientResources := noderesources.Fits(pod, nodeInfo)
if len(insufficientResources) != 0 {
for i := range insufficientResources {
admissionResults = append(admissionResults, AdmissionResult{InsufficientResource: &insufficientResources[i]})
}
if !includeAllFailures {
return admissionResults
}
}
if matches, _ := corev1nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node()); !matches {
admissionResults = append(admissionResults, AdmissionResult{Name: nodeaffinity.Name, Reason: nodeaffinity.ErrReasonPod})
if !includeAllFailures {
return admissionResults
}
}
if !nodename.Fits(pod, nodeInfo) {
admissionResults = append(admissionResults, AdmissionResult{Name: nodename.Name, Reason: nodename.ErrReason})
if !includeAllFailures {
return admissionResults
}
}
if !nodeports.Fits(pod, nodeInfo) {
admissionResults = append(admissionResults, AdmissionResult{Name: nodeports.Name, Reason: nodeports.ErrReason})
if !includeAllFailures {
return admissionResults
}
}
return admissionResults
}
6.6 验证资源是否足够
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
insufficientResources := make([]InsufficientResource, 0, 4)
allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
if len(nodeInfo.Pods)+1 > allowedPodNumber {//验证node的最大pod数是否超过
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourcePods,
Reason: "Too many pods",
Requested: 1,
Used: int64(len(nodeInfo.Pods)),
Capacity: int64(allowedPodNumber),
})
}
//如果没有任何限制,都是0,直接返回可创建
if podRequest.MilliCPU == 0 &&
podRequest.Memory == 0 &&
podRequest.EphemeralStorage == 0 &&
len(podRequest.ScalarResources) == 0 {
return insufficientResources
}
//验证cpu是否足够
if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourceCPU,
Reason: "Insufficient cpu",
Requested: podRequest.MilliCPU,
Used: nodeInfo.Requested.MilliCPU,
Capacity: nodeInfo.Allocatable.MilliCPU,
})
}
//验证内存是否足够
if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourceMemory,
Reason: "Insufficient memory",
Requested: podRequest.Memory,
Used: nodeInfo.Requested.Memory,
Capacity: nodeInfo.Allocatable.Memory,
})
}
//验证存储空间是否足够
if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - nodeInfo.Requested.EphemeralStorage) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourceEphemeralStorage,
Reason: "Insufficient ephemeral-storage",
Requested: podRequest.EphemeralStorage,
Used: nodeInfo.Requested.EphemeralStorage,
Capacity: nodeInfo.Allocatable.EphemeralStorage,
})
}
for rName, rQuant := range podRequest.ScalarResources {
//如果是被忽略的扩展资源之一,跳过
if v1helper.IsExtendedResourceName(rName) {
var rNamePrefix string
if ignoredResourceGroups.Len() > 0 {
rNamePrefix = strings.Split(string(rName), "/")[0]
}
if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
continue
}
}
//验证扩展插件是否足够
if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: rName,
Reason: fmt.Sprintf("Insufficient %v", rName),
Requested: podRequest.ScalarResources[rName],
Used: nodeInfo.Requested.ScalarResources[rName],
Capacity: nodeInfo.Allocatable.ScalarResources[rName],
})
}
}
return insufficientResources
}
6.7 验证亲和性(验证标签逻辑简单且乱,就是key和val的对比。不多介绍)
func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
var selector labels.Selector
if len(pod.Spec.NodeSelector) > 0 { //标签选择器
selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
}
//亲和性必须性炎症
var affinity *LazyErrorNodeSelector
if pod.Spec.Affinity != nil &&
pod.Spec.Affinity.NodeAffinity != nil &&
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}
func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {
if s.labelSelector != nil {//校验节点选择器
if !s.labelSelector.Matches(labels.Set(node.Labels)) {
return false, nil
}
}
if s.nodeSelector != nil {//校验亲和性
return s.nodeSelector.Match(node)
}
return true, nil
}
6.8 如果不是静态pod。验证容忍度
func FindMatchingUntoleratedTaint(taints []v1.Taint, tolerations []v1.Toleration, inclusionFilter taintsFilterFunc) (v1.Taint, bool) {
filteredTaints := getFilteredTaints(taints, inclusionFilter)//拿到NoExecute的条件
for _, taint := range filteredTaints {
if !TolerationsTolerateTaint(tolerations, &taint) {//如果Effect或key或val不符合,则代表不通过
return taint, true
}
}
return v1.Taint{}, false
}
func getFilteredTaints(taints []v1.Taint, inclusionFilter taintsFilterFunc) []v1.Taint {
if inclusionFilter == nil {
return taints
}
filteredTaints := []v1.Taint{}
for _, taint := range taints {
if !inclusionFilter(&taint) {
continue
}
filteredTaints = append(filteredTaints, taint) //只返回NoExecute类型的条件
}
return filteredTaints
}
6.9如果是静态pod,特殊处理一下
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(admitPod) {
return failureReasons, nil
}
nonResourceReasons := []lifecycle.PredicateFailureReason{}
resourceReasons := []*admissionRequirement{}
for _, reason := range failureReasons {
if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
resourceReasons = append(resourceReasons, &admissionRequirement{//验证一下是不是资源不足引起的
resourceName: r.ResourceName,
quantity: r.GetInsufficientAmount(),
})
} else {
nonResourceReasons = append(nonResourceReasons, reason)
}
}
if len(nonResourceReasons) > 0 {
return nonResourceReasons, nil
}
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))//驱逐一个其他的pod(流程6。10)
return nil, err
}
6.10 驱逐pod
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources)//获得可以驱逐的pod(按QOS等级,如果bestEffort没有满足条件的,则是burstable)
if err != nil {
return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
}
for _, pod := range podsToPreempt {//遍历可以驱逐的pod
c.recorder.Eventf(pod, v1.EventTypeWarning, events.PreemptContainer, message)
klog.V(3).InfoS("Preempting pod to free up resources", "pod", klog.KObj(pod), "podUID", pod.UID, "insufficientResources", insufficientResources)
err := c.killPodFunc(pod, true, nil, func(status *v1.PodStatus) {//调动函数删除pod
status.Phase = v1.PodFailed
status.Reason = events.PreemptContainer
status.Message = message
})
if err != nil {
klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod))
continue
}
//发起的pod抢占的累积数量
if len(insufficientResources) > 0 {
metrics.Preemptions.WithLabelValues(insufficientResources[0].resourceName.String()).Inc()
} else {
metrics.Preemptions.WithLabelValues("").Inc()
}
klog.InfoS("Pod evicted successfully", "pod", klog.KObj(pod))
}
return nil
}
//获得可以驱逐的pod
func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)//根据QOS分级
unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)//所有pod中,是否没有能满足只驱逐一个pod就满足当前静态pod创建成功的
if len(unableToMeetRequirements) > 0 {
return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
}
//根据三个等级分类,只返回最低等级的,如果最低等级bestEffort没有满足的,则后的burstable等级,在没有,获得guaranteed。同时获得分组中最优的驱逐pod的算法(流程6.11)
guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))//如果最低的两个等级驱逐没有能满足要求的,则驱逐guaranteed级别的。
if err != nil {
return nil, err
}
burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
if err != nil {
return nil, err
}
bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
if err != nil {
return nil, err
}
return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
}
6.11 寻找最佳的驱逐pod的算法
func getPodsToPreemptByDistance(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
podsToEvict := []*v1.Pod{}
for len(requirements) > 0 {//未满足的条件
if len(pods) == 0 {
return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", requirements.toString())
}
bestDistance := float64(len(requirements) + 1)//比较值
bestPodIndex := 0 //最优pod的索引值index
for i, pod := range pods {//遍历所有pod
dist := requirements.distance(pod)//获得最优值
if dist < bestDistance || (bestDistance == dist && smallerResourceRequest(pod, pods[bestPodIndex])) {//如果最优值小于比较值,则代表这个pod可以,如果相同,咋判断两个pod哪个申请的资源最少
bestDistance = dist//更新比较值
bestPodIndex = i//最优pod的索引值index
}
}
requirements = requirements.subtract(pods[bestPodIndex])//用这个最优的pod去验证能否满足所有条件。如果都可以满足,就不需要再次循环了
podsToEvict = append(podsToEvict, pods[bestPodIndex])//把这个最优pod存入要驱逐pod的数组中。
//这个是为了一个pod不满足情况下
pods[bestPodIndex] = pods[len(pods)-1]//删除这个最优pod吗,用pods列表中的最够一个pod替代(因为已经存入到驱逐pod中了)
pods = pods[:len(pods)-1]//删除最后一个pod
}
return podsToEvict, nil
}
7.上面的验证都完事了,如果有不符合的返回错误结束。如果没有问题就走到dispatchWork触发了,执行和这个流程一样的步骤去创建podkubelet源码分析 删除pod(一)