• 5. informer源码分析-概要分析


    k8s client-go k8s informers 实现了持续获取集群的所有资源对象监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对 apiserver、对 etcd 的请求压力。Informers 在启动的时候会首先在客户端调用 List 接口来获取全量的对象集合,然后通过 Watch 接口来获取增量的对象,然后更新本地缓存

    1. k8s informer 概述

    我们都知道可以使用 k8s 的 Clientset 来获取所有的原生资源对象,那么怎么能持续的获取集群的所有资源对象,或监听集群的资源对象数据的变化呢?这里不需要轮询去不断执行 List 操作,而是调用 Watch 接口,即可监听资源对象的变化,当资源对象发生变化,客户端即可通过 Watch 接口收到资源对象的变化。

    Watch 接口虽然可以直接使用,但一般情况下很少直接使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本比较大。

    也是因为如此,client-go 提供了自己的实现机制,Informers 应运而生。informers 实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对 apiserver、对 etcd 的请求压力。Informers 在启动的时候会首先在客户端调用 List 接口来获取全量的对象集合,然后通过 Watch 接口来获取增量的对象,然后更新本地缓存

    此外 informers 也有很强的健壮性,当长期运行的 watch 连接中断时,informers 会尝试拉起一个新的 watch 请求来恢复连接,在不丢失任何事件的情况下恢复事件流。另外,informers 还可以配置一个重新同步的周期参数,每间隔该周期,informers 就会重新 List 全量数据

    在 informers 的使用上,通常每个 GroupVersionResource(GVR)只实例化一个 informers,但有时候我们在一个应用中往往会在多个地方对同一种资源对象都有 informer 的需求,所以就有了共享 informer,即 SharedInformerFactory。所以可以通过使用 SharedInformerFactory 来实例化 informers,这样本地内存缓存就只有一份,通知机制也只有一套,大大提高了效率,减少了资源浪费。

    1.1 k8s informer 架构

    在这里插入图片描述

    1.2 k8s informer 包含部件

    k8s client-go informer 主要包括以下部件:

    • Reflector:Reflector 从 kube-apiserver 中 list&watch 资源对象,然后调用 DeltaFIFO 的 Add/Update/Delete/Replace 方法将资源对象及其变化包装成 Delta 并将其丢到 DeltaFIFO 中;
    • DeltaFIFO:DeltaFIFO 中存储着一个 map 和一个 queue,即map[object key]Deltas 以及 object key 的 queue,Deltas 为 Delta 的切片类型,Delta 装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector 负责 DeltaFIFO 的输入,Controller 负责处理 DeltaFIFO 的输出;
    • Controller:Controller 从 DeltaFIFO 的 queue 中 pop 一个 object key 出来,并获取其关联的 Deltas 出来进行处理,遍历 Deltas,根据对象的变化更新 Indexer 中的本地内存缓存,并通知 Processor,相关对象有变化事件发生;
    • Processor:Processor 根据对象的变化事件类型,调用相应的 ResourceEventHandler 来处理对象的变化;
    • Indexer:Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对 etcd 的请求压力;
    • ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的 ResourceEventHandler,当对象发生变化时,将触发调用对应类型的 ResourceEventHandler 来做处理。

    根据 informer 架构,对 k8s informer 的分析将分为以下几部分进行,本篇为概要分析:
    (1)informer概要分析;
    (2)informer之初始化与启动分析;
    (3)informer之Reflector分析;
    (4)informer之DeltaFIFO分析;
    (5)informer之Controller&Processor分析;
    (6)informer之Indexer分析;


    2. informer 使用示例代码

    使用大致过程如下:
    (1)构建与 kube-apiserver 通信的 config 配置;
    (2)初始化与 apiserver 通信的 clientset;
    (3)利用 clientset 初始化 shared informer factory 以及 pod informer;
    (4)注册 informer 的自定义 ResourceEventHandler;
    (5)启动 shared informer factory,开始 informer 的 list & watch 操作;
    (6)等待 informer 从 kube-apiserver 同步资源完成,即 informer 的 list 操作获取的对象都存入到 informer 中的 indexer 本地缓存中;
    (7)创建 lister,可以从 informer 中的 indexer 本地缓存中获取对象;

    func main() {
        // 自定义与kube-apiserver通信的config配置
        master := "192.168.1.10" // apiserver url
        kubeconfig := "/.kube/config"
        config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
        if err != nil {
    		klog.Fatalf("Failed to create config: %v", err)
    	}
    	// 或使用k8s serviceAccount机制与kube-apiserver通信
    	// config, err = rest.InClusterConfig()
        
        // 初始化与apiserver通信的clientset
        clientset, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		klog.Fatalf("Failed to create client: %v", err)
    	}
    	
    	// 初始化shared informer factory以及pod informer
    	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    	
    	// 注册informer的自定义ResourceEventHandler
    	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    xxx,
    		UpdateFunc: xxx,
    		DeleteFunc: xxx,
    	})
    	
    	// 启动shared informer factory,开始informer的list & watch操作
    	stopper := make(chan struct{})
    	go factory.Start(stopper)
    	
    	// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中 
    	// 或者调用factory.WaitForCacheSync(stopper)
    	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    		return
    	}
    	
    	// 创建lister
    	podLister := podInformer.Lister()
    	// 从informer中的indexer本地缓存中获取对象
    	podList, err := podLister.List(labels.Everything())
    	if err != nil {
    		fmt.Println(err)
    	}
    	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    以上只是对 K8s informer 做了简单的介绍,以及简单的写了一下如何使用 informer 的示例代码,后面将开始对 informer 的各个部件做进一步的源码分析。

    从图中可以看出,k8s informer主要包括以下几个部分:

    2.1.Reflector

    (1)Reflector 从 kube-apiserver 中 list 资源对象列表,然后调用 DeltaFIFO 的 Replace 方法将 object 包装成 Sync/Deleted 类型的 Delta 丢进 DeltaFIFO 中;

    (2)Reflector 从 kube-apiserver 中 watch 资源对象的变化,然后调用 DeltaFIFO 的 Add/Update/Delete 方法将 object 包装成 Added/Updated/Deleted 类型的 Delta 丢到 DeltaFIFO 中;

    2.2.DeltaFIFO

    DeltaFIFO 中存储着一个 map 和一个 queue;

    (1)其中 queue 可以看成是一个先进先出队列,一个 object 进入 DeltaFIFO 中,会判断 queue 中是否已经存在该 object key,不存在则添加到队尾;

    (2)map 即 map[object key]Deltas,是 object key 和 Deltas 的映射,Deltas 是 Delta 的切片类型,Delta 中存储着 DeltaType 和 object;另外,Deltas 最末尾的两个 Deleted 类型的 Delta 会被去重;

    DeltaType 有4种,分别是 Added、Updated、Deleted、Sync

    2.3.Controller

    Controller 从 DeltaFIFO 的 queue 中 pop 一个 object key 出来,并从 DeltaFIFO 的 map 中获取其对应的 Deltas 出来进行处理,遍历 Deltas,根据 object 的变化类型更新 Indexer 本地缓存,并通知 Processor 相关对象有变化事件发生:

    (1)如果 DeltaType 是 Deleted,则调用 Indexer 的 Delete 方法,将 Indexer 本地缓存中的 object 删除,并构造 deleteNotification struct,通知 Processor 做处理;

    (2)如果 DeltaType 是 Added/Updated/Sync,调用 Indexer 的 Get 方法从 Indexer 本地缓存中获取该对象,存在则调用 Indexer 的 Update 方法来更新 Indexer 缓存中的该对象,随后构造 updateNotification struct,通知 Processor 做处理;如果 Indexer 中不存在该对象,则调用 Indexer 的 Add 方法将该对象存入本地缓存中,并构造 addNotification struct,通知 Processor 做处理;

    2.4.Processor

    Processor 根据 Controller 的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的 ResourceEventHandler(addFunc、updateFunc、deleteFunc)来处理对象的变化。

    2.5.Indexer

    Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对etcd 的请求压力。

    informer 所维护的缓存依赖于 threadSafeMap 结构体中的 items 属性,其本质上是一个用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 为资源对象的 namespace/name 组成,value 为资源对象本身,这些构成了 informer 的本地缓存。

    Indexer 除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个 node 节点上的所有 pod、查找某个命名空间下的所有 pod 等,利用到索引,可以实现快速查找。关于索引功能,则依赖于 threadSafeMap 结构体中的 indexers 与 indices 属性。

    2.6.ResourceEventHandler

    用户根据自身处理逻辑需要,注册自定义的的 ResourceEventHandler,当对象发生变化时,将触发调用对应类型的 ResourceEventHandler 来做处理。

  • 相关阅读:
    项目中如何配置 Maven 为国内源
    Appuploader 常见错误及解决方法
    python爬虫之正则表达式解析实战
    【机器学习】数据驱动方法在电网稳定分析应用浅谈
    六款 Linux 常用远程连接神器,你知道几个?
    boost之实用工具
    python / pyside6 + pymysql 实现简单的个人资金管理系统
    2023高频前端面试题-TCP
    使用Node.js开发一个文件上传功能
    【摸鱼神器】UI库秒变低代码工具——表单篇(一)设计
  • 原文地址:https://blog.csdn.net/qq_32468785/article/details/136330622