• Kubernetes控制平面组件:Kubelet


    一、介绍

    kubelet是node的守护神,维护了node上应用的生命周期,让node可以正常运行。

    每个节点上都运行一个 kubelet 服务进程,默认监听 10250 端口。

    • 接收并执行 master 发来的指令;
    • 管理 Pod 及 Pod 中的容器;
    • 每个 kubelet 进程会在 API Server 上注册节点自身信息,定期向 master 节点汇报节点的资源使用情况,并通过cAdvisor监控节点和容器的资源。

    1、节点管理

    节点管理主要是节点自注册节点状态更新

    • kubelet 可以通过设置启动参数 --register-node 来确定是否向 API Server 注册自己自注册模式;
    • 如果 kubelet 没有选择自注册模式,则需要用户自己配置 Node 资源信息,同时需要告知 kubelet集群上的 APIServer 的位置;
    • kubelet 在启动时通过 API Server 注册节点信息,并定时向 API Server 发送节点新消息,APIServer在接收到新消息后,将信息写入 etcd

    2、获取 Pod 清单方式

    • 文件:启动参数 --config 指定的配置目录下的文件即static pod。(默认/etc/Kubernetes/manifests/)。该文件每20秒重新检查一次(可配置)。
    • HTTP endpoint (URL):启动参数–manifest-url 设置。每20 秒检查一次这个endpoint(可配置)。
    • API Server:通过 API Server 监听 etcd 目录,同步 Pod 清单。
    • HTTP Server:kubelet 侦听 HTTP 请求,并响应简单的 API 以提交新的 Pod 清单。

    二、kubelet架构

    在这里插入图片描述

    • API:10250负责和客户端交互,10255负责只读,10248负责自身的探活。
      在这里插入图片描述
    • ProbeManager:livenessProbe,readnessProbe,startupProbe,由kubelet发起负责node上的pod的探活。
    • OOMWatcher:监听进程是否触发了OOM并上报给kubelet。
    • GPUManager:管理node上的GPU。
    • CAdvisor:内嵌于kubelet,基于cgroup获取node上运行的应用资源情况。
    • DiskSpaceManager:检测pod的磁盘空间大小。
    • StatusManager:管理node的状态。
    • EvictionManager:检测node资源使用情况,当到达一定水位时,它会按照既定策略把低优先级的pod驱逐掉。
    • Volume Manager:负责pod的存储。
    • Image GC:扫描node上不活跃的镜像,把这些镜像删除掉。
    • Container GC:清除Exited状态的容器。
    • ImageManager:负责镜像管理。
    • CertificateManager:kubelet是可以签证书的,负责证书管理。
    • syncLoop:watch api对象。
    • PodWorker:当syncLoop接收到pod变更通知,PodWorker就会去处理这些事件。比如看pod是否启动,如果没有就会把pod通过CRI启动。
    • CRI:容器运行时接口。可以通过kubelet的启动命令指定。
      🔥 docker shim(kubelet内置):基于docker,目前1.20版本之后已被k8s废弃,不再内置。
      🔥 remote container runtime:基于containerd或者cri-o。

    三、kubelet管理Pod的核心流程

    在这里插入图片描述

    • 通过apiserevr监听pod的状态变化即 pod update或者add事件。
    • sycnLoop接收到事件后,会把事件存到UpdatePodOptions中。
    • 不同的worker都会从队列中获取pod变更事件的清单
    • 针对每一个pod都会进行syncPod,然后会执行computePodActions,即对于这个pod需要执行什么行为。它会对比node上已经启动的容器进程,如果pod是新的,就会create,如果已经存在了,就会delete或者update。
    • 通过CRI启动或者删除pod。
    • PLEG上报pod的状态信息。维护了一个本地的pod cache,定期的向CRI发起一个relist的操作来获取当前node上的正在运行的pod清单。最后再通过pod lifecycle events上报给apiserver。

    注意:
    这里可以清晰的看到,如果runtime挂了,relist操作就会失败,pod状态无法上报,k8s就会认为这个node挂了,导致pod驱逐

    如果node上的容器进程过多,比如exited状态容器过多,relist就会一直遍历这些容器,导致耗时过长,没有在规定的时间内返回pod状态信息,PLEG就会超时,导致node状态变为Unknown,pod驱逐。

    1、Sync Loop源码解析
    type PodUpdate struct {
    	Pods   []*v1.Pod
    	Op     PodOperation
    	Source string
    }
    
    // syncLoop is the main loop for processing changes. It watches for changes from
    // three channels (file, apiserver, and http) and creates a union of them. For
    // any new change seen, will run a sync against desired state and running state. If
    // no changes are seen to the configuration, will synchronize the last known desired
    // state every sync-frequency seconds. Never returns.
    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    	klog.InfoS("Starting kubelet main sync loop")
    	// The syncTicker wakes up kubelet to checks if there are any pod workers
    	// that need to be sync'd. A one-second period is sufficient because the
    	// sync interval is defaulted to 10s.
    	syncTicker := time.NewTicker(time.Second)
    	defer syncTicker.Stop()
    	housekeepingTicker := time.NewTicker(housekeepingPeriod)
    	defer housekeepingTicker.Stop()
    	plegCh := kl.pleg.Watch()
    
    	for {
    		kl.syncLoopMonitor.Store(kl.clock.Now())
    		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
    			break
    		}
    		kl.syncLoopMonitor.Store(kl.clock.Now())
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    (1)syncTicker、housekeepingTicker、plegCh解析
    // syncTicker、housekeepingTicker
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    
    // 监听plegCh获取到node上pod变更信息,如果pod退出,要基于期望值进行update操作
    plegCh := kl.pleg.Watch()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在 Kubernetes 中,kubelet 是一个在每个节点上运行的代理进程,用于管理该节点上的 Pod 生命周期。kubelet 的主要职责之一是监控容器的状态,并确保它们按照期望的方式运行。在这个过程中,kubelet 使用了许多不同的 goroutine 和计时器来执行不同的任务。

    其中,syncTickerhousekeepingTicker 是 kubelet 中两个重要的计时器

    • syncTicker:定期从 API server 同步 Pod 和容器的状态信息。默认情况下,该计时器每隔 1 分钟执行一次,可通过命令行参数 --sync-frequency 来修改间隔时间。该计时器的任务是通过调用 syncPod() 函数来确保 Pod 和容器的状态信息是最新的,如果发现有状态不一致的情况,则触发 Pod 的重新调度或容器的重启操作。

    • housekeepingTicker:定期执行一些与节点资源管理相关的任务,例如清理过期的镜像和容器、更新容器日志文件、检查节点存储空间等。默认情况下,该计时器每隔 1 分钟执行一次,可通过命令行参数 --housekeeping-interval 来修改间隔时间。该计时器的任务是通过调用 housekeeping() 函数来执行的,该函数主要负责对 kubelet 管理的各种资源进行清理和管理,确保节点的资源使用情况处于可控范围内。

    (2)syncLoop的核心syncLoopIteration解析

    首先,plegCh(节点上的pod状态变更,需要handlepod,从而达到期望状态)、syncTickerhousekeepingTicker(有container需要清理,也需要delete container)、updates都是需要handle pod的channel,注意updates channel也是从apiserver接收更新的pod,我不清楚syncTicker和updates channel有什么不一样

    因此syncLoopIteration的工作职责: select从上面会引起handle pod的各个channel里面取对象,取到了,就使用相应的SyncHandler处理,使其达到期望状态。

    // syncLoopIteration reads from various channels and dispatches pods to the
    // given handler.
    //
    // Arguments:
    // 1.  configCh:       a channel to read config events from
    // 2.  handler:        the SyncHandler to dispatch pods to
    // 3.  syncCh:         a channel to read periodic sync events from
    // 4.  housekeepingCh: a channel to read housekeeping events from
    // 5.  plegCh:         a channel to read PLEG updates from
    
    // With that in mind, in truly no particular order, the different channels
    // are handled as follows:
    //
    //   - configCh: dispatch the pods for the config change to the appropriate
    //     handler callback for the event type
    //   - plegCh: update the runtime cache; sync pod
    //   - syncCh: sync all pods waiting for sync
    //   - housekeepingCh: trigger cleanup of pods
    //   - health manager: sync pods that have failed or in which one or more
    //     containers have failed health checks
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    	select {
    	case u, open := <-configCh:
    		// Update from a config source; dispatch it to the right handler
    		// callback.
    		if !open {
    			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
    			return false
    		}
    
    		switch u.Op {
    		case kubetypes.ADD:
    			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			// After restarting, kubelet will get all existing pods through
    			// ADD as if they are new pods. These pods will then go through the
    			// admission process and *may* be rejected. This can be resolved
    			// once we have checkpointing.
    			handler.HandlePodAdditions(u.Pods)
    		case kubetypes.UPDATE:
    			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodUpdates(u.Pods)
    		case kubetypes.REMOVE:
    			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodRemoves(u.Pods)
    		case kubetypes.RECONCILE:
    			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodReconcile(u.Pods)
    		case kubetypes.DELETE:
    			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			// DELETE is treated as a UPDATE because of graceful deletion.
    			handler.HandlePodUpdates(u.Pods)
    		case kubetypes.SET:
    			// TODO: Do we want to support this?
    			klog.ErrorS(nil, "Kubelet does not support snapshot update")
    		default:
    			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
    		}
    
    		kl.sourcesReady.AddSource(u.Source)
    
    	case e := <-plegCh:
    		if isSyncPodWorthy(e) {
    			// PLEG event for a pod; sync it.
    			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
    				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
    				handler.HandlePodSyncs([]*v1.Pod{pod})
    			} else {
    				// If the pod no longer exists, ignore the event.
    				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
    			}
    		}
    
    		if e.Type == pleg.ContainerDied {
    			if containerID, ok := e.Data.(string); ok {
    				kl.cleanUpContainersInPod(e.ID, containerID)
    			}
    		}
    	case <-syncCh:
    		// Sync pods waiting for sync
    		podsToSync := kl.getPodsToSync()
    		if len(podsToSync) == 0 {
    			break
    		}
    		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
    		handler.HandlePodSyncs(podsToSync)
    	case update := <-kl.livenessManager.Updates():
    		if update.Result == proberesults.Failure {
    			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
    		}
    	case update := <-kl.readinessManager.Updates():
    		ready := update.Result == proberesults.Success
    		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
    
    		status := ""
    		if ready {
    			status = "ready"
    		}
    		handleProbeSync(kl, update, handler, "readiness", status)
    	case update := <-kl.startupManager.Updates():
    		started := update.Result == proberesults.Success
    		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
    
    		status := "unhealthy"
    		if started {
    			status = "started"
    		}
    		handleProbeSync(kl, update, handler, "startup", status)
    	case <-housekeepingCh:
    		if !kl.sourcesReady.AllReady() {
    			// If the sources aren't ready or volume manager has not yet synced the states,
    			// skip housekeeping, as we may accidentally delete pods from unready sources.
    			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
    		} else {
    			start := time.Now()
    			klog.V(4).InfoS("SyncLoop (housekeeping)")
    			if err := handler.HandlePodCleanups(); err != nil {
    				klog.ErrorS(err, "Failed cleaning pods")
    			}
    			duration := time.Since(start)
    			if duration > housekeepingWarningDuration {
    				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
    			}
    			klog.V(4).InfoS("SyncLoop (housekeeping) end")
    		}
    	}
    	return true
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    (3)SyncHandler解析
    // SyncHandler is an interface implemented by Kubelet, for testability
    type SyncHandler interface {
    	HandlePodAdditions(pods []*v1.Pod)
    	HandlePodUpdates(pods []*v1.Pod)
    	HandlePodRemoves(pods []*v1.Pod)
    	HandlePodReconcile(pods []*v1.Pod)
    	HandlePodSyncs(pods []*v1.Pod)
    	HandlePodCleanups() error
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    syncHandler有很多种,有add、delete、update等,我接下来选取add来讲解:

    // HandlePodAdditions is the callback in SyncHandler for pods being added from
    // a config source.
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    	start := kl.clock.Now()
    	sort.Sort(sliceutils.PodsByCreationTime(pods))
    	for _, pod := range pods {
    		existingPods := kl.podManager.GetPods()
    		// Always add the pod to the pod manager. Kubelet relies on the pod
    		// manager as the source of truth for the desired state. If a pod does
    		// not exist in the pod manager, it means that it has been deleted in
    		// the apiserver and no action (other than cleanup) is required.
    		kl.podManager.AddPod(pod)
    
    		if kubetypes.IsMirrorPod(pod) {
    			kl.handleMirrorPod(pod, start)
    			continue
    		}
    
    		// Only go through the admission process if the pod is not requested
    		// for termination by another part of the kubelet. If the pod is already
    		// using resources (previously admitted), the pod worker is going to be
    		// shutting it down. If the pod hasn't started yet, we know that when
    		// the pod worker is invoked it will also avoid setting up the pod, so
    		// we simply avoid doing any work.
    		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    			// We failed pods that we rejected, so activePods include all admitted
    			// pods that are alive.
    			activePods := kl.filterOutInactivePods(existingPods)
    
    			// Check if we can admit the pod; if not, reject it.
    			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    				kl.rejectPod(pod, reason, message)
    				continue
    			}
    		}
    		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    其中最重要的是kl.dispatchWork,这个就是使用podworker去创建pod了。

    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    	// Run the sync in an async worker.
    	kl.podWorkers.UpdatePod(UpdatePodOptions{
    		Pod:        pod,
    		MirrorPod:  mirrorPod,
    		UpdateType: syncType,
    		StartTime:  start,
    	})
    	// Note the number of containers for new pods.
    	if syncType == kubetypes.SyncPodCreate {
    		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    四、pod启动流程(全是重点)

    1、启动pod全流程(从kubectl create到得到pod)

    我环境用的是containerd,这里也以containerd为例介绍pod的启动流程。
    在这里插入图片描述

    • 用户发送创建pod请求到apiserver。
    • apiserver收到请求后会把对象存储到etcd中。存储完成后会返回给apiserver。
    • scheduler 是watch apiserver的,watch到这个pod创建事件,找到一个合适的node完成调度,绑定pod。
    • apiserver再把pod的信息存储到etcd中。
    • kubelet watch到node上绑定的pod,首先会去启动SandboxContainer。
    • containerd调用cni的插件去创建pod,即配置网络。
    • cni插件把pod的ip返回给containerd,再返回给kubelet。
    • kubelet调用containerd拉取镜像,创建主容器,启动主容器。
    • kubelet上报pod状态给apiserver。
    • apiserver再把pod状态存储到etcd中。

    2、SandboxContainer

    kubelet启动容器的时候,启动的是多个容器进程。

    比如这里的nginx pod,不仅有一个nginx主容器,还有一个pause容器,这个pause容器,在k8s中就叫做SandboxContainer。pause是一个永远sleep,不会退出,不消耗任何资源的进程

    为什么要有SandboxContainer?
    1、容器技术依靠的是namespace,cgroup,rootfs。容器启动后可以和某个网络namespace关联,就可以有独立的网络配置。业务容器本身进程可能不稳定,如果每一次容器退出都需要重新配置网络,就会对系统产生一定的压力,效率也不高。SandboxContainer就可以提供一个稳定的底座,网络就基于这个容器来做的,这样其他容器退出也不会更改其网络存储等配置。

    2、某些容器进程启动是需要网络就绪的,比如java进程,或者某些需要获取第三方token的进程。这样就需要一个额外的容器来提前启动网络。把主容器的网络namespace挂载在这个sandbox即可。

    3、启动pod流程(更详细函数描述kubelet部分的流程)

    这里是更详细的pod启动流程图。

    • checkAdmit:再次验证该 Pod 是否确实能够运行在该节点上。实际上就是把一组叫作 GeneralPredicates 的、最基本的调度算法,比如:“资源是否可用”“端口是否冲突”等再执行一遍,作为 kubelet 端的二次确认。
    • Check network plugin status:如果cni挂了,pod是起不来的。
    • Update cgroup: 配置pod的cgroup。
    • makePodDataDirs: 创建pod数据目录。
    • WaitForAttachAndMount: 等待pod存储就绪,这里就通过CSI来attach and mount pod的存储目录。
    • syncPod:计算sandbox和容器的变化,如果发生了变化,就需要重新启动容器了。即上文中的ComputePodActions。
    • 如果是创建pod:首先会生成sandbox的配置文件,然后创建数据目录,然后再创建sandbox。
    • sandbox通过grpc接口调用CRI创建pod,过程中再调用CNI创建网络。
      其他流程都是一致的。
      在这里插入图片描述
  • 相关阅读:
    ubunbtu下基于c++实现MQTT客户端通信
    Linux权限管理— 文件特殊权限Sticky BIT
    Android Jetpack之ViewModel、LiveData
    网络安全——关于防火墙
    CentOS 环境下部署 vsftpd
    egg.js学习记录
    如何游戏出海代运营、游戏出海代投
    如何用postman实现接口自动化测试
    七夕看什么电影好?爬取电影评分并存入csv文件
    Zookeeper系列——2Zookeeper应用及常用命令
  • 原文地址:https://blog.csdn.net/weixin_44571270/article/details/126693768