上一章节中我们了解到kubelet每个组件的作用与创建原理,那么在本章节中我们就一起看一下它是如何对每个组件进行启动的,组件与组件之间是如何进行相互作用的。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// kubelet需要的目录创建,镜像管理器进行start操作,oomwatch启动
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
// 主要是对调度到本机的pod的容器卷进行挂载操作。
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
// 更新node的状态
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
// 快速更新一次node状态
// 调用一次updateRuntimeUp
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
//定时刷新containerRuntime
//runtimeState
//runtimeReady的状态,并且在第一次运行的时候启动
//cadvisor
//StatsProvider
//containerManager
//evictionManager
//containerLogManager
//pluginManager
//shutdownManager
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
// 设置iptables-util规则
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
// 状态管理器,主要用于更新pod的状态到apiserver中
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
// 启动pod生命周期事件生成器。
kl.pleg.Start()
// 开始对事件进行处理。也是kubelet的处理核心
kl.syncLoop(updates, kl)
}
syncLoop会调用syncLoopIteration方法
通过代码我们可以看到,syncLoopIteration中构建了这几种事件触发的方式
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
下面我们假设添加了一个pod,看看它的具体流程
当添加一个pod后,触发podconfig事件,该事件会调用HandlePodAdditions 方法
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
kl.podManager.AddPod(pod)
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
activePods := kl.filterOutInactivePods(existingPods)
// Check if we can admit the pod; if not, reject it.
// 这里会进行创建前的一些操作
// 比如判断当前节点状态是否是健康,如果不健康则不调度
// 申请CPU,内存,device硬件 资源,申请失败则调度失败。
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
// 说明允许调度下面开始创建一个podwork然后run进行创建容器等操作。
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
因为kubelet将pod的增删改逻辑都写到了一个方法中,所以会对其进行大量的状态判断逻辑,这里我们忽略掉这些直奔主题创建容器。
其逻辑主要由这几步
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.
这里是创建容器的一些配置信息,容器日志目录,容器相对应的目录。
最后会调用startContainer方法创建容器并运行容器
至于容器是怎么创建的,这需要去cri相对应的实现里查找逻辑了。
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return msg, err
}
// Step 2: create the container.
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
} else {
// The container runtime keeps state on container statuses and
// what the container restart count is. When nodes are rebooted
// some container runtimes clear their state which causes the
// restartCount to be reset to 0. This causes the logfile to
// start at 0.log, which either overwrites or appends to the
// already existing log.
//
// We are checking to see if the log directory exists, and find
// the latest restartCount by checking the log name -
// {restartCount}.log - and adding 1 to it.
logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
restartCount, err = calcRestartCountByLogDir(logDir)
if err != nil {
klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)
}
}
target, err := spec.getTargetID(podStatus)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainerConfig
}
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
if cleanupAction != nil {
defer cleanupAction()
}
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainerConfig
}
err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
return s.Message(), ErrPreCreateHook
}
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainer
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
return s.Message(), ErrPreStartHook
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
return s.Message(), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dangling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
"containerID", containerID, "containerLogPath", containerLog)
}
}
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
}
return msg, ErrPostStartHook
}
}
return "", nil
}