• kubelet源码分析-启动


    前言

    上一章节中我们了解到kubelet每个组件的作用与创建原理,那么在本章节中我们就一起看一下它是如何对每个组件进行启动的,组件与组件之间是如何进行相互作用的。

    kubelet.Run()

    // Run starts the kubelet reacting to config updates
    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    	if kl.logServer == nil {
    		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    	}
    	if kl.kubeClient == nil {
    		klog.InfoS("No API server defined - no node status update will be sent")
    	}
    
    	// Start the cloud provider sync manager
    	if kl.cloudResourceSyncManager != nil {
    		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    	}
    	// kubelet需要的目录创建,镜像管理器进行start操作,oomwatch启动
    	if err := kl.initializeModules(); err != nil {
    		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
    		klog.ErrorS(err, "Failed to initialize internal modules")
    		os.Exit(1)
    	}
    
    	// Start volume manager
    	// 主要是对调度到本机的pod的容器卷进行挂载操作。
    	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
    
    	if kl.kubeClient != nil {
    		// Introduce some small jittering to ensure that over time the requests won't start
    		// accumulating at approximately the same time from the set of nodes due to priority and
    		// fairness effect.
    		// 更新node的状态
    		go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
    		// 快速更新一次node状态
    		// 调用一次updateRuntimeUp
    		go kl.fastStatusUpdateOnce()
    
    		// start syncing lease
    		go kl.nodeLeaseController.Run(wait.NeverStop)
    	}
    	//定时刷新containerRuntime
    	//runtimeState
    	//runtimeReady的状态,并且在第一次运行的时候启动
    	//cadvisor
    	//StatsProvider
    	//containerManager
    	//evictionManager
    	//containerLogManager
    	//pluginManager
    	//shutdownManager
    	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
    
    	// Set up iptables util rules
    	// 设置iptables-util规则
    	if kl.makeIPTablesUtilChains {
    		kl.initNetworkUtil()
    	}
    
    	// Start component sync loops.
    	// 状态管理器,主要用于更新pod的状态到apiserver中
    	kl.statusManager.Start()
    
    	// Start syncing RuntimeClasses if enabled.
    	if kl.runtimeClassManager != nil {
    		kl.runtimeClassManager.Start(wait.NeverStop)
    	}
    
    	// Start the pod lifecycle event generator.
    	// 启动pod生命周期事件生成器。
    	kl.pleg.Start()
    	// 开始对事件进行处理。也是kubelet的处理核心
    	kl.syncLoop(updates, kl)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    1. initializeModules 是对imagesManager 的管理,会每5分钟更新一次镜像缓存,并删除应该删除的镜像
    2. kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) 监听pod ,当新pod触发事件后,会获取里面的容器卷配置然后进行attch与mount操作
    3. syncNodeStatus 更新node 的状态到apiserver中,其中会调用setNodeStatusFuncs 设置的方法,对每个组件获取状态
    4. updateRuntimeUp 启动注释中说的那些管理器(只启动一次)
    5. kl.statusManager.Start() 状态管理器 主要更新pod的状态到apiserver中。
    6. kl.pleg.Start() pod状态事件触发器 ,通过CRI获取POD 容器的状态 然后比对旧状态,如果不一致将事件发送给syncLoop

    syncLoopIteration

    syncLoop会调用syncLoopIteration方法
    通过代码我们可以看到,syncLoopIteration中构建了这几种事件触发的方式

    • 通过podConfig触发pod添加删除等事件
    • 通过pleg监听pod的状态触发的事件
    • sync Sync pods waiting for sync
    • 通过存活探针,就绪探针,安装探针触发的事件
    • housekeepingCh
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    	select {
    	case u, open := <-configCh:
    		// Update from a config source; dispatch it to the right handler
    		// callback.
    		if !open {
    			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
    			return false
    		}
    
    		switch u.Op {
    		case kubetypes.ADD:
    			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			// After restarting, kubelet will get all existing pods through
    			// ADD as if they are new pods. These pods will then go through the
    			// admission process and *may* be rejected. This can be resolved
    			// once we have checkpointing.
    			handler.HandlePodAdditions(u.Pods)
    		case kubetypes.UPDATE:
    			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodUpdates(u.Pods)
    		case kubetypes.REMOVE:
    			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodRemoves(u.Pods)
    		case kubetypes.RECONCILE:
    			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			handler.HandlePodReconcile(u.Pods)
    		case kubetypes.DELETE:
    			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
    			// DELETE is treated as a UPDATE because of graceful deletion.
    			handler.HandlePodUpdates(u.Pods)
    		case kubetypes.SET:
    			// TODO: Do we want to support this?
    			klog.ErrorS(nil, "Kubelet does not support snapshot update")
    		default:
    			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
    		}
    
    		kl.sourcesReady.AddSource(u.Source)
    
    	case e := <-plegCh:
    		if e.Type == pleg.ContainerStarted {
    			// record the most recent time we observed a container start for this pod.
    			// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
    			// to make sure we don't miss handling graceful termination for containers we reported as having started.
    			kl.lastContainerStartedTime.Add(e.ID, time.Now())
    		}
    		if isSyncPodWorthy(e) {
    			// PLEG event for a pod; sync it.
    			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
    				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
    				handler.HandlePodSyncs([]*v1.Pod{pod})
    			} else {
    				// If the pod no longer exists, ignore the event.
    				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
    			}
    		}
    
    		if e.Type == pleg.ContainerDied {
    			if containerID, ok := e.Data.(string); ok {
    				kl.cleanUpContainersInPod(e.ID, containerID)
    			}
    		}
    	case <-syncCh:
    		// Sync pods waiting for sync
    		podsToSync := kl.getPodsToSync()
    		if len(podsToSync) == 0 {
    			break
    		}
    		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
    		handler.HandlePodSyncs(podsToSync)
    	case update := <-kl.livenessManager.Updates():
    		if update.Result == proberesults.Failure {
    			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
    		}
    	case update := <-kl.readinessManager.Updates():
    		ready := update.Result == proberesults.Success
    		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
    
    		status := ""
    		if ready {
    			status = "ready"
    		}
    		handleProbeSync(kl, update, handler, "readiness", status)
    	case update := <-kl.startupManager.Updates():
    		started := update.Result == proberesults.Success
    		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
    
    		status := "unhealthy"
    		if started {
    			status = "started"
    		}
    		handleProbeSync(kl, update, handler, "startup", status)
    	case <-housekeepingCh:
    		if !kl.sourcesReady.AllReady() {
    			// If the sources aren't ready or volume manager has not yet synced the states,
    			// skip housekeeping, as we may accidentally delete pods from unready sources.
    			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
    		} else {
    			start := time.Now()
    			klog.V(4).InfoS("SyncLoop (housekeeping)")
    			if err := handler.HandlePodCleanups(); err != nil {
    				klog.ErrorS(err, "Failed cleaning pods")
    			}
    			duration := time.Since(start)
    			if duration > housekeepingWarningDuration {
    				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
    			}
    			klog.V(4).InfoS("SyncLoop (housekeeping) end")
    		}
    	}
    	return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114

    HandlePodAdditions

    下面我们假设添加了一个pod,看看它的具体流程
    当添加一个pod后,触发podconfig事件,该事件会调用HandlePodAdditions 方法

    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    	start := kl.clock.Now()
    	sort.Sort(sliceutils.PodsByCreationTime(pods))
    	for _, pod := range pods {
    		kl.podManager.AddPod(pod)
    		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    			activePods := kl.filterOutInactivePods(existingPods)
    			// Check if we can admit the pod; if not, reject it.
                // 这里会进行创建前的一些操作
                // 比如判断当前节点状态是否是健康,如果不健康则不调度
                // 申请CPU,内存,device硬件 资源,申请失败则调度失败。
    			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    				kl.rejectPod(pod, reason, message)
    				continue
    			}
    		}
            // 说明允许调度下面开始创建一个podwork然后run进行创建容器等操作。
    		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    SyncPod

    因为kubelet将pod的增删改逻辑都写到了一个方法中,所以会对其进行大量的状态判断逻辑,这里我们忽略掉这些直奔主题创建容器。
    其逻辑主要由这几步
    // 1. Compute sandbox and container changes.
    // 2. Kill pod sandbox if necessary.
    // 3. Kill any containers that should not be running.
    // 4. Create sandbox if necessary.
    // 5. Create ephemeral containers.
    // 6. Create init containers.
    // 7. Create normal containers.

    这里是创建容器的一些配置信息,容器日志目录,容器相对应的目录。
    最后会调用startContainer方法创建容器并运行容器

    startContainer

    至于容器是怎么创建的,这需要去cri相对应的实现里查找逻辑了。

    func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    	container := spec.container
    
    	// Step 1: pull the image.
    	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    		return msg, err
    	}
    
    	// Step 2: create the container.
    	// For a new container, the RestartCount should be 0
    	restartCount := 0
    	containerStatus := podStatus.FindContainerStatusByName(container.Name)
    	if containerStatus != nil {
    		restartCount = containerStatus.RestartCount + 1
    	} else {
    		// The container runtime keeps state on container statuses and
    		// what the container restart count is. When nodes are rebooted
    		// some container runtimes clear their state which causes the
    		// restartCount to be reset to 0. This causes the logfile to
    		// start at 0.log, which either overwrites or appends to the
    		// already existing log.
    		//
    		// We are checking to see if the log directory exists, and find
    		// the latest restartCount by checking the log name -
    		// {restartCount}.log - and adding 1 to it.
    		logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
    		restartCount, err = calcRestartCountByLogDir(logDir)
    		if err != nil {
    			klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)
    		}
    	}
    
    	target, err := spec.getTargetID(podStatus)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    		return s.Message(), ErrCreateContainerConfig
    	}
    
    	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
    	if cleanupAction != nil {
    		defer cleanupAction()
    	}
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    		return s.Message(), ErrCreateContainerConfig
    	}
    
    	err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
    		return s.Message(), ErrPreCreateHook
    	}
    
    	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    		return s.Message(), ErrCreateContainer
    	}
    	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
    		return s.Message(), ErrPreStartHook
    	}
    	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
    
    	// Step 3: start the container.
    	err = m.runtimeService.StartContainer(containerID)
    	if err != nil {
    		s, _ := grpcstatus.FromError(err)
    		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
    		return s.Message(), kubecontainer.ErrRunContainer
    	}
    	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
    
    	// Symlink container logs to the legacy container log location for cluster logging
    	// support.
    	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    	containerMeta := containerConfig.GetMetadata()
    	sandboxMeta := podSandboxConfig.GetMetadata()
    	legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
    		sandboxMeta.Namespace)
    	containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    	// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    	// Because if containerLog path does not exist, only dangling legacySymlink is created.
    	// This dangling legacySymlink is later removed by container gc, so it does not make sense
    	// to create it in the first place. it happens when journald logging driver is used with docker.
    	if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
    		if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
    			klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
    				"containerID", containerID, "containerLogPath", containerLog)
    		}
    	}
    
    	// Step 4: execute the post start hook.
    	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    		kubeContainerID := kubecontainer.ContainerID{
    			Type: m.runtimeName,
    			ID:   containerID,
    		}
    		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
    		if handlerErr != nil {
    			klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
    				"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    			m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
    			if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
    				klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
    					"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    			}
    			return msg, ErrPostStartHook
    		}
    	}
    
    	return "", nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
  • 相关阅读:
    网络安全准入技术之MAC VLAN
    web系统安全设计原则
    I.MX6U-ALPHA开发板(高精度定时器)
    ROS机械臂启动碰撞检测和启动捡拾算法时出现的两个错误,有大佬能帮忙解决吗
    驱动开发(五):Linux内核定时器
    HTTP慢连接攻击的原理和防范措施
    ChIP实验简介
    双指针技术
    T1025:保留12位小数的浮点数(信息学一本通C++)
    死锁示例代码详解
  • 原文地址:https://blog.csdn.net/a1023934860/article/details/128199893