本文中含有k8s的一个bug,我也正在努力提交PR,不过会不会被merge就不清楚了。
pod_workers.go是主要处理pod变化的文件,在1.22版本后对这个文件进行了比较大的修改。把属于pod处理的工作都放在了这个文件里。并且对pod分段处理,如审查、标记状态、处理状态等。
1、options为pod的一些基本信息。runningPod是运行中 的pod,如果runningPod存在,并且pod配置不存在,则代表是孤儿pod,只能进行删除。如果pod和runningPod都存在,则代表都会被更新,所以只保留pod即可。

2.对整个结构体进行加锁,避免污染数据。
根据uid取出当前pod状态,12行中,如果pod是不是个孤儿pod并且状态是失败或者已完成的,则记录他的状态。13行是从本地缓存中获得运行时的pod状态(不是这次要更新的状态)这里14行的函数,判断的是pod下的容器是否运行状态。流程3介绍。
p.podLock.Lock()
defer p.podLock.Unlock()
now := time.Now()
status, ok := p.podSyncStatuses[uid]
if !ok {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
if statusCache, err := p.podCache.Get(pod.UID); err == nil {
if isPodStatusCacheTerminal(statusCache) {
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
fullname: kubecontainer.GetPodFullName(pod),
}
}
}
}
p.podSyncStatuses[uid] = status
}
3.判断pod是否已经停止了。遍历容器查看是否有运行中的容器。还要判断“sb"是否有运行中的。如果都为0,则代表pod已经运行完成。然后代码2中的30行,将状态存到pod的状态流的map中
func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
runningContainers := 0
runningSandboxes := 0
for _, container := range status.ContainerStatuses {
if container.State == kubecontainer.ContainerStateRunning {
runningContainers++
}
}
for _, sb := range status.SandboxStatuses {
if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY {
runningSandboxes++
}
}
return runningContainers == 0 && runningSandboxes == 0
}
4.第一行判断为该pod是否已经停止了。如果已经停止了,并且这次的更新又是创建,则标记pod状态为重启启动。这个场景一般是静态pod才会出现,通常是具有相同UID。如果是此情况则后续会重新启动。
第9行判断是pod状态是否是已完成了。不归pod_workers管了,后续housekeeping会去清除他。
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
特殊,下面流程的5.6需要这几个函数,在这里标注一下
func (s *podSyncStatus) IsWorking() bool { return s.working }
func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() }
func (s *podSyncStatus) IsTerminationStarted() bool { return s.startedTerminating }
func (s *podSyncStatus) IsTerminated() bool { return !s.terminatedAt.IsZero() }
func (s *podSyncStatus) IsFinished() bool { return s.finished }
func (s *podSyncStatus) IsEvicted() bool { return s.evicted }
func (s *podSyncStatus) IsDeleted() bool { return s.deleted }
5.上面的一些特殊场景的处理都做完了,到这开始对当前pod进行标记状态了。
var becameTerminating bool
if !status.IsTerminationRequested() {
switch {
case isRuntimePod:
klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true //如果为true,代表apiserver上也已经删除
status.terminatingAt = now //删除的开始时间
becameTerminating = true //刚开始进入删除流程
case pod.DeletionTimestamp != nil:
klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.terminatingAt = now
becameTerminating = true
case options.UpdateType == kubetypes.SyncPodKill:
if options.KillPodOptions != nil && options.KillPodOptions.Evict {
klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.evicted = true
} else {
klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.terminatingAt = now
becameTerminating = true
}
}
6.workType标记这个pod声明周期的状态(sync同步、terminating终止中、terminated清理)
wasGracePeriodShortened代表是否缩短优雅删除时间;例,第一次优雅删除(grace period)时间是30s,第二次是10s,则代表缩短)。这个官方存在bug,无法成功缩短优雅删除时间 后面会介绍bug原因
var workType PodWorkType
var wasGracePeriodShortened bool
switch {
case status.IsTerminated():
if isRuntimePod {
klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
workType = TerminatedPodWork
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
}
options.KillPodOptions = nil
case status.IsTerminationRequested():
workType = TerminatingPodWork
if options.KillPodOptions == nil {
options.KillPodOptions = &KillPodOptions{}
}
if ch := options.KillPodOptions.CompletedCh; ch != nil {
status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
}
if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
status.statusPostTerminating = append(status.statusPostTerminating, fn)
}
gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
wasGracePeriodShortened = gracePeriodShortened
status.gracePeriod = gracePeriod
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
default:
workType = SyncPodWork
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
options.KillPodOptions = nil
}
}
7.优雅时间只能缩短,如果不是缩短则不用更改时间。优雅删除时间不能为0
func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) {
gracePeriod := status.gracePeriod
if override := pod.DeletionGracePeriodSeconds; override != nil {
if gracePeriod == 0 || *override < gracePeriod {
gracePeriod = *override
}
}
if options != nil {
if override := options.PodTerminationGracePeriodSecondsOverride; override != nil {
if gracePeriod == 0 || *override < gracePeriod {
gracePeriod = *override
}
}
}
if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil {
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
if gracePeriod < 1 {
gracePeriod = 1
}
return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
}
8.到此,pod的status状态记录就都完成了,该准备一下pod的期望状态然后进行处理了。
9、完成这个函数的工作。此函数的任务就是对pod进行一些审查,并且对status状态链路进行更新,然后标记一下状态
if !status.IsWorking() {
status.working = true
podUpdates <- work
return
}
if undelivered, ok := p.lastUndeliveredWorkUpdate[pod.UID]; ok {
if !undelivered.Options.StartTime.IsZero() && undelivered.Options.StartTime.Before(work.Options.StartTime) {
work.Options.StartTime = undelivered.Options.StartTime
}
}
p.lastUndeliveredWorkUpdate[pod.UID] = work
if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", work.WorkType)
status.cancelFn()
return
}
go func() {
defer runtime.HandleCrash()
p.managePodLoop(outCh)
}()