kubelet是node的守护神,维护了node上应用的生命周期
,让node可以正常运行。
每个节点上都运行一个 kubelet 服务进程,默认监听 10250 端口。
定期向 master 节点汇报节点的资源使用情况
,并通过cAdvisor
监控节点和容器的资源。节点管理主要是节点自注册
和节点状态更新
:
--register-node 来确定是否向 API Server 注册自己
自注册模式;定时向 API Server 发送节点新消息,APIServer在接收到新消息后,将信息写入 etcd
。文件
:启动参数 --config 指定的配置目录下的文件即static pod
。(默认/etc/Kubernetes/manifests/
)。该文件每20秒
重新检查一次(可配置)。HTTP endpoint (URL)
:启动参数–manifest-url 设置。每20 秒检查一次这个endpoint(可配置)。API Server
:通过 API Server 监听 etcd 目录,同步 Pod 清单。HTTP Server
:kubelet 侦听 HTTP 请求,并响应简单的 API 以提交新的 Pod 清单。监听pod的状态变化
即 pod update或者add事件。UpdatePodOptions
中。获取pod变更事件的清单
。syncPod
,然后会执行computePodActions
,即对于这个pod需要执行什么行为。它会对比node上已经启动的容器进程,如果pod是新的,就会create,如果已经存在了,就会delete或者update。CRI
启动或者删除pod。PLEG
:上报pod的状态信息。维护了一个本地的pod cache
,定期的向CRI发起一个relist
的操作来获取当前node上的正在运行的pod清单。最后再通过pod lifecycle events
上报给apiserver。注意:
这里可以清晰的看到,如果runtime挂了,relist操作就会失败,pod状态无法上报,k8s就会认为这个node挂了,导致pod驱逐
。
如果node上的容器进程过多,比如exited状态容器过多,relist就会一直遍历这些容器,导致耗时过长
,没有在规定的时间内返回pod状态信息,PLEG就会超时,导致node状态变为Unknown
,pod驱逐。
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
// syncTicker、housekeepingTicker
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
// 监听plegCh获取到node上pod变更信息,如果pod退出,要基于期望值进行update操作
plegCh := kl.pleg.Watch()
在 Kubernetes 中,kubelet 是一个在每个节点上运行的代理进程,用于管理该节点上的 Pod 生命周期。kubelet 的主要职责之一是监控容器的状态,并确保它们按照期望的方式运行。在这个过程中,kubelet 使用了许多不同的 goroutine 和计时器来执行不同的任务。
其中,syncTicker
和 housekeepingTicker
是 kubelet 中两个重要的计时器
。
syncTicker
:定期从 API server 同步 Pod 和容器的状态信息。默认情况下,该计时器每隔 1 分钟执行一次,可通过命令行参数 --sync-frequency
来修改间隔时间。该计时器的任务是通过调用 syncPod()
函数来确保 Pod 和容器的状态信息是最新的,如果发现有状态不一致的情况,则触发 Pod 的重新调度或容器的重启操作。
housekeepingTicker
:定期执行一些与节点资源管理相关的任务,例如清理过期的镜像和容器、更新容器日志文件、检查节点存储空间等。默认情况下,该计时器每隔 1 分钟执行一次,可通过命令行参数 --housekeeping-interval
来修改间隔时间。该计时器的任务是通过调用 housekeeping()
函数来执行的,该函数主要负责对 kubelet 管理的各种资源进行清理和管理,确保节点的资源使用情况处于可控范围内。
首先,plegCh
(节点上的pod状态变更,需要handlepod,从而达到期望状态)、syncTicker
、housekeepingTicker
(有container需要清理,也需要delete container)、updates
都是需要handle pod的channel,注意updates channel也是从apiserver接收更新的pod,我不清楚syncTicker和updates channel有什么不一样
…
因此syncLoopIteration的工作职责: select从上面会引起handle pod的各个channel里面取对象,取到了,就使用相应的SyncHandler处理,使其达到期望状态。
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. housekeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// - configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// - plegCh: update the runtime cache; sync pod
// - syncCh: sync all pods waiting for sync
// - housekeepingCh: trigger cleanup of pods
// - health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
syncHandler有很多种,有add、delete、update等,我接下来选取add来讲解:
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
其中最重要的是kl.dispatchWork,这个就是使用podworker去创建pod了。
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
我环境用的是containerd,这里也以containerd为例介绍pod的启动流程。
kubelet启动容器的时候,启动的是多个容器进程。
比如这里的nginx pod,不仅有一个nginx主容器,还有一个pause容器,这个pause容器,在k8s中就叫做SandboxContainer。pause是一个永远sleep,不会退出,不消耗任何资源的进程
。
为什么要有SandboxContainer?
1、容器技术依靠的是namespace,cgroup,rootfs。容器启动后可以和某个网络namespace关联,就可以有独立的网络配置。业务容器本身进程可能不稳定
,如果每一次容器退出都需要重新配置网络,就会对系统产生一定的压力,效率也不高。SandboxContainer就可以提供一个稳定的底座
,网络就基于这个容器来做的,这样其他容器退出也不会更改其网络存储等配置。
2、某些容器进程启动是需要网络就绪的,比如java进程,或者某些需要获取第三方token的进程。这样就需要一个额外的容器来提前启动网络
。把主容器的网络namespace挂载在这个sandbox即可。
这里是更详细的pod启动流程图。
再次验证该 Pod 是否确实能够运行在该节点上
。实际上就是把一组叫作 GeneralPredicates 的、最基本的调度算法,比如:“资源是否可用”“端口是否冲突”等再执行一遍,作为 kubelet 端的二次确认。