• k8s client-go源码分析 informer源码分析(2)-初始化与启动分析


    k8s client-go源码分析 informer源码分析(2)-初始化与启动分析

    前面一篇文章对k8s informer做了概要分析,本篇文章将对informer的初始化与启动进行分析。

    informer架构

    先来回忆一下informer的架构。

    k8s client-go informer主要包括以下部件:
    (1)Reflector:Reflector从kube-apiserver中list&watch资源对象,然后调用DeltaFIFO的Add/Update/Delete/Replace方法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中;
    (2)DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;
    (3)Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的 Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;
    (4)Processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;
    (5)Indexer:Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力;
    (6)ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的ResourceEventHandler,当对象发生变化时,将触发调用对应类型的ResourceEventHandler来做处理。

    概述

        ...
    	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    	...
    	go factory.Start(stopper)
    	...
    	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    		return
    	}
    	...
    

    上一节有列举了informer的使用代码,注意看到示例代码中的下面这段代码,做了informer初始化与启动,其中包括:
    (1)informers.NewSharedInformerFactory:初始化informer factory;
    (2)podInformer.Informer:初始化pod informer;
    (3)factory.Start:启动informer factory;
    (4)cache.WaitForCacheSync:等待list操作获取到的对象都同步到informer本地缓存Indexer中;

    下面也将根据这四部分进行informer的初始化与启动分析。

    基于k8s v1.17.4版本依赖的client-go

    1.SharedInformerFactory的初始化

    1.1 sharedInformerFactory结构体

    先来看下sharedInformerFactory结构体,看下里面有哪些属性。

    看到几个比较重要的属性:
    (1)client:连接k8s的clientSet;
    (2)informers:是个map,可以装各个对象的informer;
    (3)startedInformers:记录已经启动的informer;

    // staging/src/k8s.io/client-go/informers/factory.go
    type sharedInformerFactory struct {
    	client           kubernetes.Interface
    	namespace        string
    	tweakListOptions internalinterfaces.TweakListOptionsFunc
    	lock             sync.Mutex
    	defaultResync    time.Duration
    	customResync     map[reflect.Type]time.Duration
    
    	informers map[reflect.Type]cache.SharedIndexInformer
    	// startedInformers is used for tracking which informers have been started.
    	// This allows Start() to be called multiple times safely.
    	startedInformers map[reflect.Type]bool
    }
    

    1.2 NewSharedInformerFactory

    NewSharedInformerFactory方法用于初始化informer factory,主要是初始化并返回sharedInformerFactory结构体。

    // staging/src/k8s.io/client-go/informers/factory.go
    func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
    	return NewSharedInformerFactoryWithOptions(client, defaultResync)
    }
    
    func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
    	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
    }
    
    func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    	factory := &sharedInformerFactory{
    		client:           client,
    		namespace:        v1.NamespaceAll,
    		defaultResync:    defaultResync,
    		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
    		startedInformers: make(map[reflect.Type]bool),
    		customResync:     make(map[reflect.Type]time.Duration),
    	}
    
    	// Apply all options
    	for _, opt := range options {
    		factory = opt(factory)
    	}
    
    	return factory
    }
    

    2.对象informer的初始化

    上一节有列举了informer的使用代码,注意看到示例代码中的下面这段代码,这里利用了工厂方法设计模式,podInformer.Informer()即初始化了sharedInformerFactory中的pod的informer,具体调用关系可自行看如下代码,比较简单,这里不再展开分析。

        // 初始化informer factory以及pod informer
    	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    

    2.1 podInformer.Informer

    Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化。

    // k8s.io/client-go/informers/core/v1/pod.go
    func (f *podInformer) Informer() cache.SharedIndexInformer {
    	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
    }
    

    2.2 f.factory.InformerFor

    Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化,并传入f.defaultInformer作为newFunc,而在f.factory.InformerFor方法中,调用newFunc来初始化informer。

    这里也可以看到,其实informer初始化后会存储进map f.informers[informerType]中,即存储进sharedInformerFactory结构体的informers属性中,方便共享使用。

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	informerType := reflect.TypeOf(obj)
    	informer, exists := f.informers[informerType]
    	if exists {
    		return informer
    	}
    
    	resyncPeriod, exists := f.customResync[informerType]
    	if !exists {
    		resyncPeriod = f.defaultResync
    	}
    
    	informer = newFunc(f.client, resyncPeriod)
    	f.informers[informerType] = informer
    
    	return informer
    }
    

    2.3 newFunc/f.defaultInformer

    defaultInformer方法中,调用了NewFilteredPodInformer方法来初始化pod informer,最终初始化并返回sharedIndexInformer结构体。

    // k8s.io/client-go/informers/core/v1/pod.go
    func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    
    func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    	return cache.NewSharedIndexInformer(
    		&cache.ListWatch{
    			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.CoreV1().Pods(namespace).List(options)
    			},
    			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.CoreV1().Pods(namespace).Watch(options)
    			},
    		},
    		&corev1.Pod{},
    		resyncPeriod,
    		indexers,
    	)
    }
    
    func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    	realClock := &clock.RealClock{}
    	sharedIndexInformer := &sharedIndexInformer{
    		processor:                       &sharedProcessor{clock: realClock},
    		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    		listerWatcher:                   lw,
    		objectType:                      objType,
    		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
    		clock: realClock,
    	}
    	return sharedIndexInformer
    }
    

    2.4 sharedIndexInformer结构体

    sharedIndexInformer结构体中重点看到以下几个属性:
    (1)indexer:对应着informer中的部件Indexer,Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力;
    (2)controller:对应着informer中的部件Controller,Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;
    (3)processor:对应着informer中的部件Processor,Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    type sharedIndexInformer struct {
    	indexer    Indexer
    	controller Controller
    
    	processor             *sharedProcessor
    	cacheMutationDetector CacheMutationDetector
    
    	// This block is tracked to handle late initialization of the controller
    	listerWatcher ListerWatcher
    	objectType    runtime.Object
    
    	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    	// shouldResync to check if any of our listeners need a resync.
    	resyncCheckPeriod time.Duration
    	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    	// value).
    	defaultEventHandlerResyncPeriod time.Duration
    	// clock allows for testability
    	clock clock.Clock
    
    	started, stopped bool
    	startedLock      sync.Mutex
    
    	// blockDeltas gives a way to stop all event distribution so that a late event handler
    	// can safely join the shared informer.
    	blockDeltas sync.Mutex
    }
    
    Indexer接口与cache结构体

    cache结构体为Indexer接口的实现;

    // staging/src/k8s.io/client-go/tools/cache/store.go
    type cache struct {
    	cacheStorage ThreadSafeStore
    	keyFunc KeyFunc
    }
    

    threadSafeMap struct是ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	lock  sync.RWMutex
    	items map[string]interface{}
    
    	// indexers maps a name to an IndexFunc
    	indexers Indexers
    	// indices maps a name to an Index
    	indices Indices
    }
    

    关于Indexer的详细分析会在后续有专门的文章做分析,这里不展开分析;

    controller结构体

    而controller结构体则包含了informer中的主要部件Reflector以及DeltaFIFO;
    (1)Reflector:Reflector从kube-apiserver中list&watch资源对象,然后将对象的变化包装成Delta并将其丢到DeltaFIFO中;
    (2)DeltaFIFO:DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象及对象的变化类型 ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type controller struct {
    	config         Config
    	reflector      *Reflector
    	reflectorMutex sync.RWMutex
    	clock          clock.Clock
    }
    
    type Config struct {
    	// The queue for your objects; either a FIFO or
    	// a DeltaFIFO. Your Process() function should accept
    	// the output of this Queue's Pop() method.
    	Queue
    	...
    }
    

    3.启动sharedInformerFactory

    sharedInformerFactory.Start为informer factory的启动方法,其主要逻辑为循环遍历informers,然后跑goroutine调用informer.Run来启动sharedInformerFactory中存储的各个informer。

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			go informer.Run(stopCh)
    			f.startedInformers[informerType] = true
    		}
    	}
    }
    

    sharedIndexInformer.Run

    sharedIndexInformer.Run用于启动informer,主要逻辑为:
    (1)调用NewDeltaFIFO,初始化DeltaFIFO;
    (2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法;
    (3)调用New,利用Config结构体来初始化controller;
    (4)调用s.processor.run,启动processor;
    (5)调用s.controller.Run,启动controller;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
        
        // 初始化DeltaFIFO
    	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
        
        // 构建Config结构体
    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    s.listerWatcher,
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
    
    		Process: s.HandleDeltas,
    	}
    
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
            // 初始化controller
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()
    
    	// Separate stop channel because Processor should be stopped strictly after controller
    	processorStopCh := make(chan struct{})
    	var wg wait.Group
    	defer wg.Wait()              // Wait for Processor to stop
    	defer close(processorStopCh) // Tell Processor to stop
    	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    	// 启动processor
    	wg.StartWithChannel(processorStopCh, s.processor.run)
    
    	defer func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		s.stopped = true // Don't want any new listeners
    	}()
    	// 启动controller
    	s.controller.Run(stopCh)
    }
    

    3.1 New

    New函数初始化了controller并return。

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func New(c *Config) Controller {
    	ctlr := &controller{
    		config: *c,
    		clock:  &clock.RealClock{},
    	}
    	return ctlr
    }
    

    3.2 s.processor.run

    s.processor.run启动了processor,其中注意到listener.run与listener.pop两个核心方法即可,暂时没有用到,等下面用到他们的时候再做分析。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    	func() {
    		p.listenersLock.RLock()
    		defer p.listenersLock.RUnlock()
    		for _, listener := range p.listeners {
    			p.wg.Start(listener.run)
    			p.wg.Start(listener.pop)
    		}
    		p.listenersStarted = true
    	}()
    	<-stopCh
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    	for _, listener := range p.listeners {
    		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    	}
    	p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    

    3.3 controller.Run

    controller.Run为controller的启动方法,这里主要看到几个点:
    (1)调用NewReflector,初始化Reflector;
    (2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector;
    (3)调用c.processLoop,开始controller的核心处理;

    // k8s.io/client-go/tools/cache/controller.go
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }
    
    3.3.1 Reflector结构体

    先来看到Reflector结构体,这里重点看到以下属性:
    (1)expectedType:放到Store中(即DeltaFIFO中)的对象类型;
    (2)store:store会赋值为DeltaFIFO,具体可以看之前的informer初始化与启动分析即可得知,这里不再展开分析;
    (3)listerWatcher:存放list方法和watch方法的ListerWatcher interface实现;

    // k8s.io/client-go/tools/cache/reflector.go
    type Reflector struct {
        ...
        expectedType reflect.Type
        store Store
        listerWatcher ListerWatcher
        ...
    }
    
    3.3.2 r.Run/Reflector.Run

    Reflector.Run方法中启动了Reflector,而Reflector的核心处理逻辑为从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。

    // staging/src/k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    	wait.Until(func() {
    		if err := r.ListAndWatch(stopCh); err != nil {
    			utilruntime.HandleError(err)
    		}
    	}, r.period, stopCh)
    }
    
    3.3.3 controller.processLoop

    controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

    // k8s.io/client-go/tools/cache/controller.go
    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    
    3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas

    根据前面sharedIndexInformer.Run方法的分析中可以得知,c.config.Process其实就是sharedIndexInformer.HandleDeltas。

    HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    

    怎么通知到自定义的ResourceEventHandler呢?继续往下看。

    3.3.5 sharedIndexInformer.processor.distribute

    可以看到distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    
    	if sync {
    		for _, listener := range p.syncingListeners {
    			listener.add(obj)
    		}
    	} else {
    		for _, listener := range p.listeners {
    			listener.add(obj)
    		}
    	}
    }
    
    func (p *processorListener) add(notification interface{}) {
    	p.addCh <- notification
    }
    

    到这里,processor中的listener.pop以及listener.run方法终于派上了用场,继续往下看。

    3.3.6 listener.pop

    分析processorListener的pop方法可以得知,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中。那么谁来处理p.nextCh呢?继续往下看。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) // Tell .run() to stop
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
    		case nextCh <- notification:
    			// Notification dispatched
    			var ok bool
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
    		case notificationToAdd, ok := <-p.addCh:
    			if !ok {
    				return
    			}
    			if notification == nil { // No notification to pop (and pendingNotifications is empty)
    				// Optimize the case - skip adding to pendingNotifications
    				notification = notificationToAdd
    				nextCh = p.nextCh
    			} else { // There is already a notification waiting to be dispatched
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }
    
    3.3.7 listener.run

    在processorListener的run方法中,将循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) run() {
    	// this call blocks until the channel is closed.  When a panic happens during the notification
    	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    	// the next notification will be attempted.  This is usually better than the alternative of never
    	// delivering again.
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		// this gives us a few quick retries before a long pause and then a few more quick retries
    		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    			for next := range p.nextCh {
    				switch notification := next.(type) {
    				case updateNotification:
    					p.handler.OnUpdate(notification.oldObj, notification.newObj)
    				case addNotification:
    					p.handler.OnAdd(notification.newObj)
    				case deleteNotification:
    					p.handler.OnDelete(notification.oldObj)
    				default:
    					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
    				}
    			}
    			// the only way to get here is if the p.nextCh is empty and closed
    			return true, nil
    		})
    
    		// the only way to get here is if the p.nextCh is empty and closed
    		if err == nil {
    			close(stopCh)
    		}
    	}, 1*time.Minute, stopCh)
    }
    
    

    而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了。

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
      })
    
    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type ResourceEventHandlerFuncs struct {
    	AddFunc    func(obj interface{})
    	UpdateFunc func(oldObj, newObj interface{})
    	DeleteFunc func(obj interface{})
    }
    
    func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
    	if r.AddFunc != nil {
    		r.AddFunc(obj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    	if r.UpdateFunc != nil {
    		r.UpdateFunc(oldObj, newObj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
    	if r.DeleteFunc != nil {
    		r.DeleteFunc(obj)
    	}
    }
    

    4.cache.WaitForCacheSync(stopper, informer.HasSynced)

    可以看出在cache.WaitForCacheSync方法中,实际上是调用方法入参cacheSyncs ...InformerSynced来判断cache是否同步完成(即调用informer.HasSynced方法),而这里说的cache同步完成,意思是等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    	err := wait.PollImmediateUntil(syncedPollPeriod,
    		func() (bool, error) {
    			for _, syncFunc := range cacheSyncs {
    				if !syncFunc() {
    					return false, nil
    				}
    			}
    			return true, nil
    		},
    		stopCh)
    	if err != nil {
    		klog.V(2).Infof("stop requested")
    		return false
    	}
    
    	klog.V(4).Infof("caches populated")
    	return true
    }
    

    4.1 informer.HasSynced

    HasSynced方法实际上是调用了sharedIndexInformer.controller.HasSynced方法;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HasSynced() bool {
    	s.startedLock.Lock()
    	defer s.startedLock.Unlock()
    
    	if s.controller == nil {
    		return false
    	}
    	return s.controller.HasSynced()
    }
    
    s.controller.HasSynced

    这里的c.config.Queue.HasSynced()方法,实际上是指DeltaFIFO的HasSynced方法,会在DeltaFIFO的分析中再详细分析,这里只需要知道当informer的list操作获取的对象都存入到informer中的indexer本地缓存中则返回true即可;

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func (c *controller) HasSynced() bool {
    	return c.config.Queue.HasSynced()
    }
    

    4.2 sharedInformerFactory.WaitForCacheSync

    可以顺带看下sharedInformerFactory.WaitForCacheSync方法,其实际上是遍历factory中的所有informer,调用cache.WaitForCacheSync,然后传入每个informer的HasSynced方法作为入参;

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    	informers := func() map[reflect.Type]cache.SharedIndexInformer {
    		f.lock.Lock()
    		defer f.lock.Unlock()
    
    		informers := map[reflect.Type]cache.SharedIndexInformer{}
    		for informerType, informer := range f.informers {
    			if f.startedInformers[informerType] {
    				informers[informerType] = informer
    			}
    		}
    		return informers
    	}()
    
    	res := map[reflect.Type]bool{}
    	for informType, informer := range informers {
    		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    	}
    	return res
    }
    

    至此,整个informer的初始化与启动的分析就结束了,后面会对informer中的各个核心部件进行详细分析,敬请期待。

    总结

    下面用两张图片总结一下informer的初始化与启动;

    informer初始化

    informer启动

  • 相关阅读:
    Agent-FLAN 技术报告——社区翻译版
    Java —— 抽象类和接口
    SpringMVC--从理解SpringMVC执行流程到SSM框架整合
    Linux完全卸载PyTorch&重装(cuda11.1)
    Linux安装jdk
    jenkins: pass multiple “Extended Choice Parameter“ values using a URL
    【JAVA JDBC】
    设计模式之十一:代理模式
    凝聚五城力量 2022Home尧泰汉海“益起微笑”公益活动圆满举行
    Python 教程之控制流(10)在Python中有效地使用迭代
  • 原文地址:https://www.cnblogs.com/lianngkyle/p/16244872.html