• 【k8s源码篇之Informer篇3】理解Informer中的Reflector组件


    参考

    架构

    Informer 和 Controller

    • 便于理解的架构图
    • 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
      • 在源码级别,基本上是一起实现的,一个结构体内涵盖

    在这里插入图片描述

    Informer 简要架构

    • 源码级简要理解

    (三)Kubernetes 源码剖析之学习Informer机制_golang_03

    Informer 详细架构

    • 源码级详细理解

    img

    Reflector

    img

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    
    type sharedIndexInformer struct {
       // 索引和 缓存 store
       indexer    Indexer
       // Informer 内部的 controller,不是我们自定义的 Controller
       controller Controller
       // 处理函数,将是重点
       processor *sharedProcessor
       // 检测 cache 是否有变化,一把用作调试,默认是关闭的
       cacheMutationDetector MutationDetector
       // 构造 Reflector 需要
       listerWatcher ListerWatcher
       // 目标类型,给 Reflector 判断资源类型
       objectType runtime.Object
       // Reflector 进行重新同步周期
       resyncCheckPeriod time.Duration
       // 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期
       defaultEventHandlerResyncPeriod time.Duration
       clock                           clock.Clock
       // 两个 bool 表达了三个状态:controller 启动前、已启动、已停止
       started, stopped bool
       startedLock      sync.Mutex
       // 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱
       blockDeltas sync.Mutex
       // Watch 返回 err 的回调函数
       watchErrorHandler WatchErrorHandler
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    内部总管家 controller

    • Controller 作为核心中枢,集成了组件 Reflector、DeltaFIFO

    • DeltaFIFO 的消费 HandleDeltas 成为连接下游消费者的桥梁

      1. 用于更新索引Indexer和缓存 Loacl Store
      2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
    • Controller 由 controller 结构体进行具体实现 —— 这里的 controller 指的是 Informer 内部的控制器:

      • 在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。

    结构体定义如下:

    // k8s.io/client-go/tools/cache/controller.go
    
    // 接口定义又哪些行为
    // Controller is a generic controller framework.
    type Controller interface {
    	Run(stopCh <-chan struct{})
    	HasSynced() bool
    	LastSyncResourceVersion() string
    }
    
    // controller 的具体实现
    // Controller is a generic controller framework.
    type controller struct {
    	config         Config			// 包含着 ListAndWatch 函数,DeltaFIFO
    	reflector      *Reflector // Reflector , 用于 ListAndWatch
    	reflectorMutex sync.RWMutex
    	clock          clock.Clock
    }
    
    
    // Run 的时候,会创建 Reflector
    // Run begins processing items, and will continue until a value is sent down stopCh.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
      // Reflector 的构建依赖于 Config
    	r := NewReflector(
    		c.config.ListerWatcher, // ListAndWatch 函数
    		c.config.ObjectType,
    		c.config.Queue, // Delta FIFO
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r // Reflector
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
      // processLoop 就是  HandleDeltas 函数
      // 1. 用于更新索引Indexer和缓存 Loacl Store 
      // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
    	wait.Until(c.processLoop, time.Second, stopCh)
    }
    
    // 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
    // c.config.Process 就是  HandleDeltas 函数,在 config 初始化可以看到
    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。

    连接下游的 HandleDeltas

    DeltaFIFO 的消费 HandleDeltas 成为连接下游消费者的桥梁

    1. 用于更新索引Indexer和缓存 Loacl Store
    2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
    // k8s.io/client-go/tools/cache/shared_informer.go
    
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
        // 资源的同步、添加、更新实践
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
            // 重点!!!更新缓存和索引
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
            // 将事件分发,进行处理
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
            // 重点!!!更新缓存和索引
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
        // 资源的删除时间
    		case Deleted:
          //  重点!!!更新缓存和索引
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
          //  重点!!! 通知事件的到来,给订阅的 Informer 发送消息通知
          // 相应 Informer 的 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理
          // 然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    controller 的配置管理 Config

    • 此配置项基本上涵盖了 controller 创建所必须得
      1. Queue —— DeltaFIFO
      2. ListerWatcher —— Reflector 的 ListAndWatch
      3. Process —— 连接下游的 HandleDeltas(事件处理handler,同步缓存和索引)
    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type Config struct {
       // 实际由 DeltaFIFO 实现
       Queue
       // 构造 Reflector 需要
       ListerWatcher
       // Pop 出来的 obj 处理函数   连接下游的 HandleDeltas 函数
       Process ProcessFunc
       // 目标对象类型
       ObjectType runtime.Object
       // 全量重新同步周期
       FullResyncPeriod time.Duration
       // 是否进行重新同步的判断函数
       ShouldResync ShouldResyncFunc
       // 如果为 true,Process() 函数返回 err,则再次入队 re-queue
       RetryOnError bool
       // Watch 返回 err 的回调函数
       WatchErrorHandler WatchErrorHandler
       // Watch 分页大小
       WatchListPageSize int64
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    controller 管理的 Reflector

    Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。

    首先看一下 Reflector 结构体定义:

    // staging/src/k8s.io/client-go/tools/cache/reflector.go
    type Reflector struct {
       // 通过 file:line 唯一标识的 name
       name string
     
       // 下面三个为了确认类型
       expectedTypeName string
       expectedType     reflect.Type
       expectedGVK      *schema.GroupVersionKind
     
       // 存储 interface: 具体由 DeltaFIFO 实现存储
       store Store
       // 用来从 apiserver 拉取全量和增量资源
       listerWatcher ListerWatcher
     
       // 下面两个用来做失败重试
       backoffManager         wait.BackoffManager
       initConnBackoffManager wait.BackoffManager
      
       // informer 使用者重新同步的周期
       resyncPeriod time.Duration
       // 判断是否满足可以重新同步的条件
       ShouldResync func() bool
       
       clock clock.Clock
       
       // 是否要进行分页 List
       paginatedResult bool
       
       // 最后同步的资源版本号,以此为依据,watch 只会监听大于此值的资源
       lastSyncResourceVersion string
       // 最后同步的资源版本号是否可用
       isLastSyncResourceVersionUnavailable bool
       // 加把锁控制版本号
       lastSyncResourceVersionMutex sync.RWMutex
       
       // 每页大小
       WatchListPageSize int64
       // watch 失败回调 handler
       watchErrorHandler WatchErrorHandler
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。

    watch 目标类型通过 Go reflect 反射实现如下:

    // staging/src/k8s.io/client-go/tools/cache/reflector.go
    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
     
       ...
       if r.expectedType != nil {
          if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
             utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
             continue
          }
       }
       if r.expectedGVK != nil {
          if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
             utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
             continue
          }
       }
       ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;
    • List/Watch 的目标资源类型在NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;

    controller 管理的 DeltaFIFO

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    type DeltaFIFO struct {
       // 读写锁、条件变量
       lock sync.RWMutex
       cond sync.Cond
       // kv 存储:objKey1->Deltas[obj1-Added, obj1-Updated...]
       items map[string]Deltas
       // 只存储所有 objKeys
       queue []string
       // 是否已经填充:通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
       populated bool
       // 通过 Replace() 接口将第一批对象放入队列的数量
       initialPopulationCount int
       // keyFunc 用来从某个 obj 中获取其对应的 objKey
       keyFunc KeyFunc
       // 已知对象,其实就是 Indexer
       knownObjects KeyListerGetter 
       // 队列是否已经关闭
       closed bool
       // 以 Replaced 类型发送(为了兼容老版本的 Sync)
       emitDeltaTypeReplaced bool
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    DeltaType 可分为以下类型:

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    type DeltaType string
     
     
    const (
       Added   DeltaType = "Added"
       Updated DeltaType = "Updated"
       Deleted DeltaType = "Deleted"
       Replaced DeltaType = "Replaced" // 第一次或重新同步
       Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。

    Reflector 的 ListAndWatch

    // 接口定义
    // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
    type ListerWatcher interface {
    	Lister  // 该接口 定义了 List 方法
    	Watcher // 该接口 定义了 Watch 方法
    }
    
    // Lister is any object that knows how to perform an initial list.
    type Lister interface {
    	// List should return a list type object; the Items field will be extracted, and the
    	// ResourceVersion field will be used to start the watch in the right place.
    	List(options metav1.ListOptions) (runtime.Object, error)
    }
    
    // Watcher is any object that knows how to start a watch on a resource.
    type Watcher interface {
    	// Watch should begin a watch at the specified version.
    	Watch(options metav1.ListOptions) (watch.Interface, error)
    }
    
    // 接口的实现
    // 接口的作用体 —— ListWatch struct
    // ListFunc knows how to list resources
    type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
    
    // WatchFunc knows how to watch resources
    type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
    
    // ListWatch knows how to list and watch a set of apiserver resources.  It satisfies the ListerWatcher interface.
    // It is a convenience function for users of NewReflector, etc.
    // ListFunc and WatchFunc must not be nil
    type ListWatch struct {
    	ListFunc  ListFunc
    	WatchFunc WatchFunc
    	// DisableChunking requests no chunking for this list watcher.
    	DisableChunking bool
    }
    
    // 作用体 ListWatch struct 的构建
    // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
    func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    	optionsModifier := func(options *metav1.ListOptions) {
    		options.FieldSelector = fieldSelector.String()
    	}
    	return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
    }
    
    // NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
    // Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
    // to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
    func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
    		optionsModifier(&options)
    		return c.Get().
    			Namespace(namespace).
    			Resource(resource).
    			VersionedParams(&options, metav1.ParameterCodec).
    			Do().
    			Get()
    	}
    	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
    		options.Watch = true
    		optionsModifier(&options)
    		return c.Get().
    			Namespace(namespace).
    			Resource(resource).
    			VersionedParams(&options, metav1.ParameterCodec).
    			Watch()
    	}
    	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
    }
    
    // 方法的实现
    // List a set of apiserver resources
    func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
    	if !lw.DisableChunking {
    		return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
    	}
    	return lw.ListFunc(options)
    }
    
    // Watch a set of apiserver resources
    func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
    	return lw.WatchFunc(options)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    sharedIndexInformer 的 Run 函数

    1. 初始化 Config 包含(DeltaFIFO队列、ListAndWatch函数、HandleDeltas函数)
    2. 利用 Config 创建 controller
    3. controller 执行 Run 函数时,会利用 Config 创建 Reflector
    // k8s.io/client-go/tools/cache/shared_informer.go
    
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	// DeltaFIFO
    	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
      // 注意此处的 Config
    	cfg := &Config{
    		Queue:            fifo,							// DeltaFIFO
    		ListerWatcher:    s.listerWatcher,  // ListAndWatch 函数  
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
        // 重点!!!
        // 处理 Deltas 的函数,也就是 handler(调用注册的 AddFunc、UpdateFunc、DeleteFunc)
        // 同时负责同步 索引Indexer 和 缓存 Local Store
    		Process: s.HandleDeltas,  
    	}
    
      // 使用 Config 创建 共享Informer 内部的 controller
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		// 创建 共享Informer 内部的 controller
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()
    
    	// Separate stop channel because Processor should be stopped strictly after controller
    	processorStopCh := make(chan struct{})
    	var wg wait.Group
    	defer wg.Wait()              // Wait for Processor to stop
    	defer close(processorStopCh) // Tell Processor to stop
    	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    	wg.StartWithChannel(processorStopCh, s.processor.run)
    
    	defer func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		s.stopped = true // Don't want any new listeners
    	}()
      // Run 的时候会创建 Reflector
    	s.controller.Run(stopCh)
    }
    
    // k8s.io/client-go/tools/cache/controller.go
    
    // 接口定义又哪些行为
    // Controller is a generic controller framework.
    type Controller interface {
    	Run(stopCh <-chan struct{})
    	HasSynced() bool
    	LastSyncResourceVersion() string
    }
    
    // controller 的具体实现
    // Controller is a generic controller framework.
    type controller struct {
    	config         Config			// 包含着 ListAndWatch 函数,DeltaFIFO
    	reflector      *Reflector // Reflector , 用于 ListAndWatch
    	reflectorMutex sync.RWMutex
    	clock          clock.Clock
    }
    
    // Run 的时候会创建 Reflector
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
      // Reflector 的构建依赖于 Config
    	r := NewReflector(
    		c.config.ListerWatcher, // ListAndWatch 函数
    		c.config.ObjectType,
    		c.config.Queue, // Delta FIFO
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r // Reflector
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
      // processLoop 就是  HandleDeltas 函数
      // 1. 用于更新索引Indexer和缓存 Loacl Store 
      // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
    	wait.Until(c.processLoop, time.Second, stopCh)
    }
    
    // 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
    // c.config.Process 就是  HandleDeltas 函数,在 config 初始化可以看到
    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
  • 相关阅读:
    AI智能网关在工业物联网领域有哪些应用优势
    动捕设备推动舞蹈表演动作捕捉动画制作突破边界
    Linux下备份文件到其他服务器
    (四)Redis 缓存应用、淘汰机制
    阿里P8熬了一个月肝出这份32W字Java面试手册,在Github标星31K+
    un-app部署h5项目到普通云服务器--域名解析--OOS对象存储
    CSS3媒体查询与页面自适应
    如何在3dMax中使用Python按类型选择对象?
    Shiro 01(shiro框架入门)
    nestjs使用rabbitMQ
  • 原文地址:https://blog.csdn.net/qq_24433609/article/details/126230120