studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStudent, // 下面有这个函数解释,就是先将对象同步到缓存中,再放入 workqueue 队列
UpdateFunc: func(old, new interface{}) {
oldStudent := old.(*bolingcavalryv1.Student)
newStudent := new.(*bolingcavalryv1.Student)
if oldStudent.ResourceVersion == newStudent.ResourceVersion {
//版本一致,就表示没有实际更新的操作,立即返回
return
}
controller.enqueueStudent(new)
},
DeleteFunc: controller.enqueueStudentForDelete,
})
s.processor.distribute
函数// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 资源的同步、添加、更新四件
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 更新缓存和索引
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 将事件分发,进行处理
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 下面有详细解析
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
// 资源的删除时间
case Deleted:
// 更新缓存和索引
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 将事件分发,进行处理
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
type sharedIndexInformer struct {
// 索引和 缓存 store
indexer Indexer
controller Controller
// 处理函数,将是重点
processor *sharedProcessor
// 检测 cache 是否有变化,一把用作调试,默认是关闭的
cacheMutationDetector MutationDetector
// 构造 Reflector 需要
listerWatcher ListerWatcher
// 目标类型,给 Reflector 判断资源类型
objectType runtime.Object
// Reflector 进行重新同步周期
resyncCheckPeriod time.Duration
// 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 两个 bool 表达了三个状态:controller 启动前、已启动、已停止
started, stopped bool
startedLock sync.Mutex
// 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱
blockDeltas sync.Mutex
// Watch 返回 err 的回调函数
watchErrorHandler WatchErrorHandler
}
首先考虑为什么要有 distribute
分发函数,而是为什么sharedProcessor
处理器叫”共享“?
是因为ShareInformer
,不知道你是否还记得每创建一个Informer
都是基于ShareInformerFactory
// 这个是 Students 的控制器运行的 main 函数
// 其中涉及共享 Informer 工厂函数
func main() {
// 省略若干行代码 ...
// 关注 Student 资源的 「共享 Informer 制造工厂studentInformerFactory」
studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)
// 得到controller
// 利用 「共享 Informer 制造工厂studentInformerFactory」生产出或获取已有的 「Student 某版本的 共享Informer 实例」
// 共享 Informer,只是共享了 Reflector(ListAndWatch 对Apiserver 监控),以及缓存 Indexer 和索引 Local Store
// 目的是:减少同类型多个 Informer
controller := NewController(kubeClient, studentClient,
studentInformerFactory.Bolingcavalry().V1().Students())
//启动informer
go studentInformerFactory.Start(stopCh)
}
// NewController returns a new student controller
func NewController(
// 省略若干行代码 ...
controller := &Controller{
kubeclientset: kubeclientset,
studentclientset: studentclientset,
studentsLister: studentInformer.Lister(),
studentsSynced: studentInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),
recorder: recorder,
}
// 基于共享 Informer ,添加自己的 Informer 处理逻辑
// 这部分可以理解为 每个 Controller 独立的 Informer 部分
studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStudent,
UpdateFunc: func(old, new interface{}) {
oldStudent := old.(*bolingcavalryv1.Student)
newStudent := new.(*bolingcavalryv1.Student)
if oldStudent.ResourceVersion == newStudent.ResourceVersion {
//版本一致,就表示没有实际更新的操作,立即返回
return
}
controller.enqueueStudent(new)
},
DeleteFunc: controller.enqueueStudentForDelete,
})
return controller
}
其实他们共享着 —— 【Refactor】【Indexer】【Local Store】 —— 为了减少 Apiserver 的访问压力,及节约存储(因为都是关注着一种资源)
但是一个资源可能会有「多个 Informer 实例」监听着,而且「每个 Informer 的处理逻辑都不是一致的(AddFunc、DeleteFunc、UpdateFunc)」,因此如何通知这些 Informer 处理呢?
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
// 因为各 listeners 设置的 resyncPeriod 可能不一致
// 所以将没有设置(resyncPeriod = 0) 的归为 listeners 组,将设置了 resyncPeriod 的归到 syncingListeners 组;
listeners []*processorListener
// 如果某个 listener 在多个地方(sharedIndexInformer.resyncCheckPeriod,
// sharedIndexInformer.AddEventHandlerWithResyncPeriod)都设置了 resyncPeriod,则取最小值 minimumResyncPeriod;
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
// 遍历所属组全部 listeners,将数据投递到 processorListener 进行处理
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
handler ResourceEventHandler
是【 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)】
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
nextCh chan interface{}
// addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
addCh chan interface{}
// 此处即为 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)
handler ResourceEventHandler
// pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// k8s.io/utils/buffer/ring_growing.go
// RingGrowing is a growing ring buffer.
// Not thread safe.
type RingGrowing struct {
data []interface{}
n int // Size of Data
beg int // First available element
readable int // Number of data items available
}
考虑这样一个问题?【资源变化的事件 Delta】很多时,【processorListener】处理不过来怎么办?
pop 函数的作用:
run 函数的作用:
run 和 pop 以各自的 goroutine 在后台运行
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
nextCh chan interface{}
// addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
addCh chan interface{}
...
// pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
pendingNotifications buffer.RingGrowing
}
// k8s.io/client-go/tools/cache/shared_informer.go
// Deleta 传输函数
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
// k8s.io/client-go/tools/cache/shared_informer.go
// Deleta 处理函数
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
// 调用 handler 函数,也就是预先注册的(AddFunc、UpdateFunc、DeleteFunc 函数),进行处理
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}