• kubelet源码分析 创建/更新/调谐 pod


    kubelet源码 更新/协调pod

    在这里插入图片描述

    这篇写的就是更新、调谐、同步时触发的函数变化。
    一、调谐(Reconcile)

    一、调谐(Reconcile)

    写过CRD的应该对这个函数很熟悉,当CRD有变化的时候或每过2s都是通过触发这个函数进行比对,调谐pod的状态
    但是这个调谐和CRD的不一样,这里只是调谐READY和被驱逐或者失败的pod
    kubectl apply -f demo.yaml
    1.类似于kubelet删除pod中的流程2

    		case kubetypes.RECONCILE:
    			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodReconcile(u.Pods)
    
    • 1
    • 2
    • 3

    这里触发了reconcile函数

    2.这个函数主要三个流程步骤

    • 先判断是否是静态pod,如果是静态pod,更新一下静态pod的缓存
    • 判断这个pod是否需要调谐(流程3)
    • 这个pod是否是已经完成或者失败的,如果是,则进行清理(流程8)
    func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
    	start := kl.clock.Now()
    	for _, pod := range pods {
    		kl.podManager.UpdatePod(pod)
    
    		if status.NeedToReconcilePodReadiness(pod) {
    			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    		}
          ****//流程8
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.验证是否需要调谐

    • 第二行查看是否设置了readinessGates,如果没设置,直接返回。
    • 第5行开始是检测设定这个条件的实际状态(流程4)
    • 第7行是检测运行中为ready类型的conditions(就是验证一下是否就绪了)。这里可以简单说一下,如果用户添加了readinessGates,那readinessGates必须通过检查,否则ready类型的conditions会变为false(流程6)
    • 如果运行中的和实际的条件不符,则是需要调谐
    func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
    	if len(pod.Spec.ReadinessGates) == 0 {
    		return false
    	}
    	podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase)
    	i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady)
    	if i >= 0 && (curCondition.Status != podReadyCondition.Status || curCondition.Message != podReadyCondition.Message) {
    		return true
    	}
    	return false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.验证一下实际状态

    • 前四行是获得状态,后续会验证状态(流程5)
    • 从GenerateContainersReadyCondition函数开始
    • 如果containerStatuses(容器状态)为空,则代表肯定没有ready,直接status返回false
    • 如果有容器,但是!containerStatus.Ready容器没有运行中。则也是没有ready的
    • 如果容器都没有,肯定也是没ready
    • 第29行,如果容器已经运行完成了(源码中会显示Succeeded,kubectl会显示Completed)并且没有没启动的容器,则返回未就绪,原因是已完成
    • 第36行开始,是把没有容器和容器未ready的汇总一下,拼接成一个字符串。如果错误不为空,则返回的是没有ready
    • 如果没有任何错误,返回的是ready
    func GeneratePodReadyCondition(spec *v1.PodSpec, conditions []v1.PodCondition, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {
    	containersReady := GenerateContainersReadyCondition(spec, containerStatuses, podPhase)
    	*******
    }
    
    
    func GenerateContainersReadyCondition(spec *v1.PodSpec, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {
    	if containerStatuses == nil {
    		return v1.PodCondition{
    			Type:   v1.ContainersReady,
    			Status: v1.ConditionFalse,
    			Reason: UnknownContainerStatuses,
    		}
    	}
    	unknownContainers := []string{}
    	unreadyContainers := []string{}
    	for _, container := range spec.Containers {
    		if containerStatus, ok := podutil.GetContainerStatus(containerStatuses, container.Name); ok {
    			if !containerStatus.Ready {
    				unreadyContainers = append(unreadyContainers, container.Name)
    			}
    		} else {
    			unknownContainers = append(unknownContainers, container.Name)
    		}
    	}
    
    	if podPhase == v1.PodSucceeded && len(unknownContainers) == 0 {
    		return v1.PodCondition{
    			Type:   v1.ContainersReady,
    			Status: v1.ConditionFalse,
    			Reason: PodCompleted,
    		}
    	}
    	unreadyMessages := []string{}
    	if len(unknownContainers) > 0 {
    		unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers))
    	}
    	if len(unreadyContainers) > 0 {
    		unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers))
    	}
    	unreadyMessage := strings.Join(unreadyMessages, ", ")
    	if unreadyMessage != "" {
    		return v1.PodCondition{
    			Type:    v1.ContainersReady,
    			Status:  v1.ConditionFalse,
    			Reason:  ContainersNotReady,
    			Message: unreadyMessage,
    		}
    	}
    
    	return v1.PodCondition{
    		Type:   v1.ContainersReady,
    		Status: v1.ConditionTrue,
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    5.重新回到上面的函数,获取了容器实际状态后,要做返回值判断了

    • 第4行,如果容器未能ready,则返回pod未ready
    • 如果容器都ready了,需要验证用户自定义的readinessGates了
    • 第14行开始循环它,如果未找到,则输入错误,如果找到了但是未ready,也是输入错误
    • 如果错误不为空,则返回pod未ready
    • 如果错误为空,代表pod是ready状态的
    func GeneratePodReadyCondition(spec *v1.PodSpec, conditions []v1.PodCondition, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {
    	containersReady := GenerateContainersReadyCondition(spec, containerStatuses, podPhase)
    	if containersReady.Status != v1.ConditionTrue {
    		return v1.PodCondition{
    			Type:    v1.PodReady,
    			Status:  containersReady.Status,
    			Reason:  containersReady.Reason,
    			Message: containersReady.Message,
    		}
    	}
    
    	unreadyMessages := []string{}
    	for _, rg := range spec.ReadinessGates {
    		_, c := podutil.GetPodConditionFromList(conditions, rg.ConditionType)
    		if c == nil {
    			unreadyMessages = append(unreadyMessages, fmt.Sprintf("corresponding condition of pod readiness gate %q does not exist.", string(rg.ConditionType)))
    		} else if c.Status != v1.ConditionTrue {
    			unreadyMessages = append(unreadyMessages, fmt.Sprintf("the status of pod readiness gate %q is not \"True\", but %v", string(rg.ConditionType), c.Status))
    		}
    	}
    
    	if len(unreadyMessages) != 0 {
    		unreadyMessage := strings.Join(unreadyMessages, ", ")
    		return v1.PodCondition{
    			Type:    v1.PodReady,
    			Status:  v1.ConditionFalse,
    			Reason:  ReadinessGatesNotReady,
    			Message: unreadyMessage,
    		}
    	}
    
    	return v1.PodCondition{
    		Type:   v1.PodReady,
    		Status: v1.ConditionTrue,
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    6.这里比较简单,找到conditions中为ready类型的返回。然后返回流程3的最后一步

    func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
    	if conditions == nil {
    		return -1, nil
    	}
    	for i := range conditions {
    		if conditions[i].Type == conditionType {
    			return i, &conditions[i]
    		}
    	}
    	return -1, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    7.这是流程3的代码,如果需要调谐,则进入到dispatchWork函数,然后触发了podworkers.updatepod。之后的就可以参考kubelet源码 删除pod(二)

    		if status.NeedToReconcilePodReadiness(pod) {
    			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    		}
    
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    	// Run the sync in an async worker.z
    	kl.podWorkers.UpdatePod(UpdatePodOptions{
    		Pod:        pod,
    		MirrorPod:  mirrorPod,
    		UpdateType: syncType,
    		StartTime:  start,
    	})
         *****
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    8.接下来要看pod是不是被驱逐和失败的状态,从缓存中取到pod信息,进行删除

    		if eviction.PodIsEvicted(pod.Status) {
    			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
    				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
    			}
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、更新

    1.同上原理

    	case kubetypes.UPDATE:
    			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodUpdates(u.Pods)
    
    • 1
    • 2
    • 3

    2.因为触发的是updates函数,所以和删除是一个道理,参考kubelet源码 删除pod(二)

    三、移除(remove)

    1.同上原理 。触发后,会在缓存中删掉pod,到pod_workers.go的UpdatePod处理的时候这个pod已经Finished了,所以不会走继续流程,具体如果Finished,可以参考之前的删除pod文章

    		case kubetypes.REMOVE:
    			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodRemoves(u.Pods)
    
    	func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    	start := kl.clock.Now()
    	for _, pod := range pods {
    		kl.podManager.DeletePod(pod)
    		if kubetypes.IsMirrorPod(pod) {
    			kl.handleMirrorPod(pod, start)
    			continue
    		}
    		// Deletion is allowed to fail because the periodic cleanup routine
    		// will trigger deletion again.
    		if err := kl.deletePod(pod); err != nil {
    			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
    		}
    	}
    }
    
    
    	func (kl *Kubelet) deletePod(pod *v1.Pod) error {
    	if pod == nil {
    		return fmt.Errorf("deletePod does not allow nil pod")
    	}
    	if !kl.sourcesReady.AllReady() {
    		// If the sources aren't ready, skip deletion, as we may accidentally delete pods
    		// for sources that haven't reported yet.
    		return fmt.Errorf("skipping delete because sources aren't ready yet")
    	}
    	klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
    	kl.podWorkers.UpdatePod(UpdatePodOptions{
    		Pod:        pod,
    		UpdateType: kubetypes.SyncPodKill,
    	})
    	// We leave the volume/directory cleanup to the periodic cleanup routine.
    	return nil
    }
    
    
    //直接触发这里,已经停止了,所以后续不会走流程了
    pod_workers.go
    	if status.IsFinished() {
    		klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
    		return
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    四、存活探针(livenessManager)

    1.只有存换探针失败的时候,才会触发。
    2.触发handleProbeSync函数后,会查到pod,并且记录状态,然后触发HandlePodSyncs函数最后和上面一样执行dispatchWork函数,等pod_workers.go执行完后到kubelet.go的syncpod函数真正去管理pod(后续会介绍)
    3.后续再处理的时候,会重新调用CRI重启容器

    	case update := <-kl.livenessManager.Updates():
    		if update.Result == proberesults.Failure {
    			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
    		}
    		
    func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
    	// We should not use the pod from manager, because it is never updated after initialization.
    	pod, ok := kl.podManager.GetPodByUID(update.PodUID)
    	if !ok {
    		// If the pod no longer exists, ignore the update.
    		klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
    		return
    	}
    	klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
    	handler.HandlePodSyncs([]*v1.Pod{pod})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    五、就绪探针(readinessManager)

    • 更新一下状态缓存,然后在触发handleProbeSync函数,后续和上面四一样
    	case update := <-kl.readinessManager.Updates():
    		ready := update.Result == proberesults.Success
    		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)//更新状态缓存
    
    		status := ""
    		if ready {
    			status = "ready"
    		}
    		handleProbeSync(kl, update, handler, "readiness", status)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    四、启动探针(startupManager)

    case update := <-kl.startupManager.Updates():
    		started := update.Result == proberesults.Success
    		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)//更新状态缓存
    
    		status := "unhealthy"
    		if started {
    			status = "started"
    		}
    		handleProbeSync(kl, update, handler, "startup", status)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    Docker快速极简配置nginx实现不同域名访问分流
    基于spring boot开发的快递管理系统开题报告
    前端list.push,封装多个对象
    Postman 如何获取请求结果并设置到请求头中
    SpringBoot之缓存篇
    Stable Diffusion代码简介
    Fiddler 系列教程(二) Composer创建和发送HTTP Request跟手机抓包
    vue面试相关知识
    121. 买卖股票的最佳时机 --力扣 --JAVA
    猿创征文| Unity高级开发面向对象编程知识总结
  • 原文地址:https://blog.csdn.net/qq_35679620/article/details/128033725