• 【k8s源码篇之Informer篇2】理解Informer内部的运行逻辑


    参考

    架构

    Informer 和 Controller

    • 便于理解的架构图
    • 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
      • 在源码级别,基本上是一起实现的,一个结构体内涵盖

    在这里插入图片描述

    Informer 简要架构

    • 源码级简要理解

    (三)Kubernetes 源码剖析之学习Informer机制_golang_03

    Informer 详细架构

    • 源码级详细理解

    img

    带着问题去思考

    编写 informer 时的 AddEventHandler 如何作用?

    1. 可以看出这三个函数都与【资源的变化处理】有关
    2. 因此我们下一步将查看与【资源的变化处理】相关的函数
    	studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc: controller.enqueueStudent, // 下面有这个函数解释,就是先将对象同步到缓存中,再放入 workqueue 队列
    		UpdateFunc: func(old, new interface{}) {
    			oldStudent := old.(*bolingcavalryv1.Student)
    			newStudent := new.(*bolingcavalryv1.Student)
    			if oldStudent.ResourceVersion == newStudent.ResourceVersion {
                    //版本一致,就表示没有实际更新的操作,立即返回
    				return
    			}
    			controller.enqueueStudent(new)
    		},
    		DeleteFunc: controller.enqueueStudentForDelete,
    	})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    HandleDeltas

    1. 因为【DeltasFiFO】记录着【资源的变化】,通过查看源码,我们定位到此函数【HandleDeltas】
    2. 通过代码得知
      • 首先根据事件类型,更新缓存和索引
      • 之后将事件分发,进行处理,涉及到s.processor.distribute 函数
    // k8s.io/client-go/tools/cache/shared_informer.go
    
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
        // 资源的同步、添加、更新四件
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
            // 更新缓存和索引
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
            // 将事件分发,进行处理
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
            // 下面有详细解析
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
        // 资源的删除时间
    		case Deleted:
          // 更新缓存和索引
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
          // 将事件分发,进行处理
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    
    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    
    type SharedIndexInformer interface {
       SharedInformer
       // AddIndexers add indexers to the informer before it starts.
       AddIndexers(indexers Indexers) error
       GetIndexer() Indexer
    }
      
    type sharedIndexInformer struct {
       // 索引和 缓存 store
       indexer    Indexer
       controller Controller
       // 处理函数,将是重点
       processor *sharedProcessor
       // 检测 cache 是否有变化,一把用作调试,默认是关闭的
       cacheMutationDetector MutationDetector
       // 构造 Reflector 需要
       listerWatcher ListerWatcher
       // 目标类型,给 Reflector 判断资源类型
       objectType runtime.Object
       // Reflector 进行重新同步周期
       resyncCheckPeriod time.Duration
       // 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期
       defaultEventHandlerResyncPeriod time.Duration
       clock                           clock.Clock
       // 两个 bool 表达了三个状态:controller 启动前、已启动、已停止
       started, stopped bool
       startedLock      sync.Mutex
       // 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱
       blockDeltas sync.Mutex
       // Watch 返回 err 的回调函数
       watchErrorHandler WatchErrorHandler
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    distribute 处理分发函数

    img

    • 首先考虑为什么要有 distribute 分发函数,而是为什么sharedProcessor处理器叫”共享“?

      • 是因为ShareInformer ,不知道你是否还记得每创建一个Informer都是基于ShareInformerFactory

        • k8s自定义controller三部曲之三:编写controller代码

        • //  这个是 Students 的控制器运行的 main 函数
          //  其中涉及共享 Informer 工厂函数
          func main() {
            // 省略若干行代码 ...
            
            // 关注 Student 资源的  「共享 Informer 制造工厂studentInformerFactory」
          	studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)
          
            // 得到controller
            // 利用  「共享 Informer 制造工厂studentInformerFactory」生产出或获取已有的 「Student 某版本的 共享Informer 实例」
            // 共享 Informer,只是共享了 Reflector(ListAndWatch 对Apiserver 监控),以及缓存 Indexer 和索引 Local Store
            // 目的是:减少同类型多个 Informer
          	controller := NewController(kubeClient, studentClient,
          		studentInformerFactory.Bolingcavalry().V1().Students())
          
              //启动informer
          	go studentInformerFactory.Start(stopCh)
          }
          
          
          // NewController returns a new student controller
          func NewController(
            // 省略若干行代码 ...
          	controller := &Controller{
          		kubeclientset:    kubeclientset,
          		studentclientset: studentclientset,
          		studentsLister:   studentInformer.Lister(),
          		studentsSynced:   studentInformer.Informer().HasSynced,
          		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),
          		recorder:         recorder,
          	}
          
            // 基于共享 Informer ,添加自己的 Informer 处理逻辑
            // 这部分可以理解为 每个 Controller 独立的 Informer 部分
          	studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
          		AddFunc: controller.enqueueStudent,
          		UpdateFunc: func(old, new interface{}) {
          			oldStudent := old.(*bolingcavalryv1.Student)
          			newStudent := new.(*bolingcavalryv1.Student)
          			if oldStudent.ResourceVersion == newStudent.ResourceVersion {
                          //版本一致,就表示没有实际更新的操作,立即返回
          				return
          			}
          			controller.enqueueStudent(new)
          		},
          		DeleteFunc: controller.enqueueStudentForDelete,
          	})
          
          	return controller
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
        • 其实他们共享着 —— 【Refactor】【Indexer】【Local Store】 —— 为了减少 Apiserver 的访问压力,及节约存储(因为都是关注着一种资源)

      • 但是一个资源可能会有「多个 Informer 实例」监听着,而且「每个 Informer 的处理逻辑都不是一致的(AddFunc、DeleteFunc、UpdateFunc)」,因此如何通知这些 Informer 处理呢?

        • 答案很简单 —— []*processorListener 数组(也叫切片)
        • 每一个 Informer 的处理逻辑 —— 封装为一个 processorListener
        • 当有【资源变化 Delta】产生时,便会通过 range,通知到所有的【processorListener】,即【 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)】
        • 通过对下面源码的阅读便可理解
    // k8s.io/client-go/tools/cache/shared_informer.go
    
    type sharedProcessor struct {
    	listenersStarted bool
    	listenersLock    sync.RWMutex
      // 因为各 listeners 设置的 resyncPeriod 可能不一致
      // 所以将没有设置(resyncPeriod = 0) 的归为 listeners 组,将设置了 resyncPeriod 的归到 syncingListeners 组;
    	listeners        []*processorListener
      // 如果某个 listener 在多个地方(sharedIndexInformer.resyncCheckPeriod,
      // sharedIndexInformer.AddEventHandlerWithResyncPeriod)都设置了 resyncPeriod,则取最小值 minimumResyncPeriod;
    	syncingListeners []*processorListener
    	clock            clock.Clock
    	wg               wait.Group
    }
    
    // k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    
    	if sync {
        // 遍历所属组全部 listeners,将数据投递到 processorListener 进行处理
    		for _, listener := range p.syncingListeners {
    			listener.add(obj)
    		}
    	} else {
    		for _, listener := range p.listeners {
    			listener.add(obj)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    processorListener —— Informer 的差异处

    • 通过下面代码的查看,便可得知handler ResourceEventHandler 是【 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)】
      • 也就是每个 Informer 的差异之处
    // k8s.io/client-go/tools/cache/shared_informer.go
    
    type processorListener struct {
      // nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
    	nextCh chan interface{}
      // addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
    	addCh  chan interface{}
    
      // 此处即为 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)
    	handler ResourceEventHandler
    
      // pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
    	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
    	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
    	// added until we OOM.
    	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
    	// we should try to do something better.
    	pendingNotifications buffer.RingGrowing
    
    	// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    	requestedResyncPeriod time.Duration
    	// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
    	// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
    	// informer's overall resync check period.
    	resyncPeriod time.Duration
    	// nextResync is the earliest time the listener should get a full resync
    	nextResync time.Time
    	// resyncLock guards access to resyncPeriod and nextResync
    	resyncLock sync.Mutex
    }
    
    // k8s.io/client-go/tools/cache/controller.go
    
    type ResourceEventHandler interface {
    	OnAdd(obj interface{})
    	OnUpdate(oldObj, newObj interface{})
    	OnDelete(obj interface{})
    }
    
    // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
    // as few of the notification functions as you want while still implementing
    // ResourceEventHandler.
    type ResourceEventHandlerFuncs struct {
    	AddFunc    func(obj interface{})
    	UpdateFunc func(oldObj, newObj interface{})
    	DeleteFunc func(obj interface{})
    }
    
    // k8s.io/utils/buffer/ring_growing.go
    
    // RingGrowing is a growing ring buffer.
    // Not thread safe.
    type RingGrowing struct {
    	data     []interface{}
    	n        int // Size of Data
    	beg      int // First available element
    	readable int // Number of data items available
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    processorListener 中通道和环形缓存的作用

    • 考虑这样一个问题?【资源变化的事件 Delta】很多时,【processorListener】处理不过来怎么办?

      • 接下来我们就分析【processorListener】的【事件获取】【事件缓存】和【事件处理机制】
    • pop 函数的作用:

      • 【Delta】首先传给【addCh】通道
        • 若【nextCh】通道可以接收,那么【nextCh】直接接收【Delta通知】
        • 若【nextCh】不能接收,就将【Delta通知】放入到【环形缓存pendingNotifications中】,之后【nextCh】空闲,便会消费【环形缓存pendingNotifications中的Delta通知】
    • run 函数的作用:

      • 从【nextCh】通道读取【Delta通知】,之后调用预先注册的回调函数【如p.handler.OnAdd】进行处理
      • 之后就是 【handler函数的自定义筛选逻辑】将需要的事件放入到【workqueue】,等待【自定义的Controller消费】——【Controller】部分就不属于 Informer 逻辑了
    • run 和 pop 以各自的 goroutine 在后台运行

    // k8s.io/client-go/tools/cache/shared_informer.go
    
    type processorListener struct {
      // nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
    	nextCh chan interface{}
      // addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
    	addCh  chan interface{}
      ...
    
      // pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
    	pendingNotifications buffer.RingGrowing
    
    }
    
    
    // k8s.io/client-go/tools/cache/shared_informer.go
    // Deleta 传输函数
    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) // Tell .run() to stop
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
    		case nextCh <- notification:
    			// Notification dispatched
    			var ok bool
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
    		case notificationToAdd, ok := <-p.addCh:
    			if !ok {
    				return
    			}
    			if notification == nil { // No notification to pop (and pendingNotifications is empty)
    				// Optimize the case - skip adding to pendingNotifications
    				notification = notificationToAdd
    				nextCh = p.nextCh
    			} else { // There is already a notification waiting to be dispatched
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }
    
    // k8s.io/client-go/tools/cache/shared_informer.go
    // Deleta 处理函数
    func (p *processorListener) run() {
    	// this call blocks until the channel is closed.  When a panic happens during the notification
    	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    	// the next notification will be attempted.  This is usually better than the alternative of never
    	// delivering again.
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		// this gives us a few quick retries before a long pause and then a few more quick retries
    		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    			for next := range p.nextCh {
    				switch notification := next.(type) {
              // 调用 handler 函数,也就是预先注册的(AddFunc、UpdateFunc、DeleteFunc 函数),进行处理
    				case updateNotification:
    					p.handler.OnUpdate(notification.oldObj, notification.newObj)
    				case addNotification:
    					p.handler.OnAdd(notification.newObj)
    				case deleteNotification:
    					p.handler.OnDelete(notification.oldObj)
    				default:
    					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
    				}
    			}
    			// the only way to get here is if the p.nextCh is empty and closed
    			return true, nil
    		})
    
    		// the only way to get here is if the p.nextCh is empty and closed
    		if err == nil {
    			close(stopCh)
    		}
    	}, 1*time.Minute, stopCh)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
  • 相关阅读:
    df = pd.read_xxx(“xxx“, dtype=xxx)dtype问题
    redis主从
    lvgl 页面管理器
    Azure 机器学习:使用 Azure 机器学习 CLI、SDK 和 REST API 训练模型
    Spring MVC:请求转发与请求重定向
    js实现数组扁平化的多种方式
    P1208 [USACO1.3] 混合牛奶 Mixing Milk
    【水果派不吃灰】Raspberry Pi树莓派小常识
    异步过渡方案—Generator
    智加科技与东风柳汽达成深度合作 自动驾驶重卡计划2024年初量产交付
  • 原文地址:https://blog.csdn.net/qq_24433609/article/details/126230052