kubelet源码 删除pod pod_workers.go(三)
上篇中UpdatePod函数结束,然后进入managePodLoop函数。
UpdatePod负责对pod的状态流程进行更新,对pod状态的标记
managePodLoop函数主要负责就是上述标记后处理这些pod了
1.managePodLoop函数里整个都是一个for循环,接收podUpdates管道传过来的数据
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
var lastSyncTime time.Time
var podStarted bool
for update := range podUpdates {
//******
}
}
2.这里主要校验的是静态pod,只有静态pod是允许启动的才可以走下面流程,如果是不存在的或者已准备删除的,则直接关闭这个静态pod的goroutine,不参与循环。如果这个pod正等待启动中则跳过当前这次循环即可。(流程4详解)
if !podStarted {
canStart, canEverStart := p.allowPodStart(pod)
if !canEverStart {
p.completeUnstartedTerminated(pod)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
return
}
if !canStart {
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
podStarted = true
}
3.流程2代码块的第三行。验证这个pod是否允许启动
func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
if !kubetypes.IsStaticPod(pod) {
return true, true
}
p.podLock.Lock()
defer p.podLock.Unlock()
status, ok := p.podSyncStatuses[pod.UID]
if !ok {
klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
return false, false
}
if status.IsTerminationRequested() {
return false, false
}
if !p.allowStaticPodStart(status.fullname, pod.UID) {
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false, true
}
return true, true
}
4.流程3第15行。这个函数场景是静态pod有全名是一样的。如果当前的已经有启动的了并且是自己,则返回true。如果不是自己,是其他同名的启动成功,则返回false。
func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
startedUID, started := p.startedStaticPodsByFullname[fullname]
if started {
return startedUID == uid
}
waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
for i, waitingUID := range waitingPods {
status, ok := p.podSyncStatuses[waitingUID]
if !ok || status.IsTerminationRequested() || status.IsTerminated() {
continue
}
if waitingUID != uid {
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:]
return false
}
waitingPods = waitingPods[i+1:]
break
}
if len(waitingPods) != 0 {
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods
} else {
delete(p.waitingToStartStaticPodsByFullname, fullname)
}
p.startedStaticPodsByFullname[fullname] = uid
return true
}
5.流程2代码块的第4行 如果这个静态pod已经收到关闭或者删除信号了,将这个pod的更新信息及管道都清除。
func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod never started and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
p.cleanupPodUpdates(pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
if !status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.finished = true
status.working = false
status.terminatedAt = time.Now()
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
}
var status *kubecontainer.PodStatus
var err error
switch {
case update.Options.RunningPod != nil:
default:
status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)
}
if err != nil {
p.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
ctx := p.contextForWorker(pod.UID)
7.这里进行处理pod了
switch {
case update.WorkType == TerminatedPodWork:
err = p.syncTerminatedPodFn(ctx, pod, status)
case update.WorkType == TerminatingPodWork:
var gracePeriod *int64
if opt := update.Options.KillPodOptions; opt != nil {
gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
}
podStatusFn := p.acknowledgeTerminating(pod)
err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)
default:
isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
}
lastSyncTime = time.Now()
return err
8.上面的函数在kubelet.go中已经对容器和沙箱处理完了。开始对pod的状态进行处理
switch {
case err == context.Canceled:
klog.V(2).InfoS("Sync exited with context cancellation error", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
case err != nil:
klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(pod), "podUID", pod.UID)
case update.WorkType == TerminatedPodWork:
p.completeTerminated(pod)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
return
case update.WorkType == TerminatingPodWork:
if update.Options.RunningPod != nil {
p.completeTerminatingRuntimePod(pod)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
return
}
p.completeTerminating(pod)
phaseTransition = true
case isTerminal:
klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
p.completeSync(pod)
phaseTransition = true
}
9.流程8中的第8行
func (p *podWorkers) completeTerminated(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod is complete and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
p.cleanupPodUpdates(pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
if status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.finished = true
status.working = false
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
}
//上面函数的第七行
func (p *podWorkers) cleanupPodUpdates(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
}
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
}
10.流程8的第16行
这里和上面的流程基本一样,需要看的是最后一个处理,将最后一个未更新的状态,变为当前pod,并且状态为已删除,这样下次再循环到这个pod的时候,就是进行彻底删除了
func (p *podWorkers) completeTerminating(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.terminatedAt = time.Now()
for _, ch := range status.notifyPostTerminating {
close(ch)
}
status.notifyPostTerminating = nil
status.statusPostTerminating = nil
}
p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
WorkType: TerminatedPodWork,
Options: UpdatePodOptions{
Pod: pod,
},
}
}
11.流程8的第29行,与上方大同小异
func (p *podWorkers) completeSync(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
status.terminatingAt = time.Now()
} else {
klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.startedTerminating = true
}
p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
WorkType: TerminatingPodWork,
Options: UpdatePodOptions{
Pod: pod,
},
}
}
12.最后的工作
p.completeWork(pod, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
//上面第一行函数
func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
switch {
case phaseTransition:
p.workQueue.Enqueue(pod.UID, 0)
case syncErr == nil:
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
default:
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
}
p.completeWorkQueueNext(pod.UID)
}
13.这里是对pod的goroutine进行一些处理
func (p *podWorkers) completeWorkQueueNext(uid types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
p.podUpdates[uid] <- workUpdate
delete(p.lastUndeliveredWorkUpdate, uid)
} else {
p.podSyncStatuses[uid].working = false
}
}