• 浅析kubernetes中client-go Informer


    之前了解了client-go中的架构设计,也就是 tools/cache 下面的一些概念,那么下面将对informer进行分析

    Controller

    在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。code

    Config

    从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。

    type Config struct {
    	Queue // DeltaFIFO
    	ListerWatcher // 用于list watch的
    	Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
    	ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
    	FullResyncPeriod time.Duration // 全量同步的周期
    	ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
    	RetryOnError bool
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    controller

    然后 controller 又为 reflertor 的上层

    type controller struct {
    	config         Config
    	reflector      *Reflector 
    	reflectorMutex sync.RWMutex
    	clock          clock.Clock
    }
    
    type Controller interface {
    	// controller 主要做两件事,
        // 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
        // 2. Queue用Pop()弹出数据,具体的操作是Process
        // 直到 stopCh 不阻塞,这两个协程将退出
    	Run(stopCh <-chan struct{})
    	HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
    	LastSyncResourceVersion() string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    controller 中的方法,仅有一个 Run()New();这意味着,controller 只是一个抽象的概念,作为 Reflector, Delta FIFO 整合的工作流

    controller 则是 SharedInformer 了。

    Queue

    这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 StoreStore 是不具备弹出功能的。

    type Queue interface {
    	Store
    	// Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器
    	Pop(PopProcessFunc) (interface{}, error)
    
    	// AddIfNotPresent puts the given accumulator into the Queue (in
    	// association with the accumulator's key) if and only if that key
    	// is not already associated with a non-empty accumulator.
    	AddIfNotPresent(interface{}) error
    
    	// HasSynced returns true if the first batch of keys have all been
    	// popped.  The first batch of keys are those of the first Replace
    	// operation if that happened before any Add, Update, or Delete;
    	// otherwise the first batch is empty.
    	HasSynced() bool
    	Close() // 关闭queue
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    而弹出的操作是通过 controller 中的 processLoop() 进行的,最终走到Delta FIFO中进行处理。

    通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理

    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    DeltaFIFO.Pop()

    func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    	for {
    		for len(f.queue) == 0 {
    			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
    			// When Close() is called, the f.closed is set and the condition is broadcasted.
    			// Which causes this loop to continue and return from the Pop().
    			if f.IsClosed() {
    				return nil, ErrFIFOClosed
    			}
    
    			f.cond.Wait()
    		}
    		id := f.queue[0]
    		f.queue = f.queue[1:]
    		if f.initialPopulationCount > 0 {
    			f.initialPopulationCount--
    		}
    		item, ok := f.items[id]
    		if !ok {
    			// Item may have been deleted subsequently.
    			continue
    		}
    		delete(f.items, id)
    		err := process(item) // 进行处理
    		if e, ok := err.(ErrRequeue); ok {
    			f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
    			err = e.Err 
    		}
    		// Don't need to copyDeltas here, because we're transferring
    		// ownership to the caller.
    		return item, err
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Informer

    通过对 Reflector, Store, Queue, ListerWatcherProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer

    func newInformer(
    	lw ListerWatcher,
    	objType runtime.Object,
    	resyncPeriod time.Duration,
    	h ResourceEventHandler,
    	clientState Store,
    ) Controller {
    	// This will hold incoming changes. Note how we pass clientState in as a
    	// KeyLister, that way resync operations will result in the correct set
    	// of update/delete deltas.
    	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    		KnownObjects:          clientState,
    		EmitDeltaTypeReplaced: true,
    	})
    
    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    lw,
    		ObjectType:       objType,
    		FullResyncPeriod: resyncPeriod,
    		RetryOnError:     false,
    
    		Process: func(obj interface{}) error {
    			// from oldest to newest
    			for _, d := range obj.(Deltas) {
    				switch d.Type {
    				case Sync, Replaced, Added, Updated:
    					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
    						if err := clientState.Update(d.Object); err != nil {
    							return err
    						}
    						h.OnUpdate(old, d.Object)
    					} else {
    						if err := clientState.Add(d.Object); err != nil {
    							return err
    						}
    						h.OnAdd(d.Object)
    					}
    				case Deleted:
    					if err := clientState.Delete(d.Object); err != nil {
    						return err
    					}
    					h.OnDelete(d.Object)
    				}
    			}
    			return nil
    		},
    	}
    	return New(cfg)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFunc, Store 等 controller的概念。最终对外的方法为 NewInformer()

    func NewInformer(
    	lw ListerWatcher,
    	objType runtime.Object,
    	resyncPeriod time.Duration,
    	h ResourceEventHandler,
    ) (Store, Controller) {
    	// This will hold the client state, as we know it.
    	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
    
    	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
    }
    
    type ResourceEventHandler interface {
    	OnAdd(obj interface{})
    	OnUpdate(oldObj, newObj interface{})
    	OnDelete(obj interface{})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可以看到 NewInformer() 就是一个带有 Store功能的controller,通过这些可以假定出,Informer 就是controller ,将queue中相关操作分发给不同事件处理的功能

    SharedIndexInformer

    shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer 则是对shareInformer 的扩展

    type SharedInformer interface {
    	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
    	// period.  Events to a single handler are delivered sequentially, but there is no coordination
    	// between different handlers.
    	AddEventHandler(handler ResourceEventHandler)
    	// AddEventHandlerWithResyncPeriod adds an event handler to the
    	// shared informer with the requested resync period; zero means
    	// this handler does not care about resyncs.  The resync operation
    	// consists of delivering to the handler an update notification
    	// for every object in the informer's local cache; it does not add
    	// any interactions with the authoritative storage.  Some
    	// informers do no resyncs at all, not even for handlers added
    	// with a non-zero resyncPeriod.  For an informer that does
    	// resyncs, and for each handler that requests resyncs, that
    	// informer develops a nominal resync period that is no shorter
    	// than the requested period but may be longer.  The actual time
    	// between any two resyncs may be longer than the nominal period
    	// because the implementation takes time to do work and there may
    	// be competing load and scheduling noise.
    	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    	// GetStore returns the informer's local cache as a Store.
    	GetStore() Store
    	// GetController is deprecated, it does nothing useful
    	GetController() Controller
    	// Run starts and runs the shared informer, returning after it stops.
    	// The informer will be stopped when stopCh is closed.
    	Run(stopCh <-chan struct{})
    	// HasSynced returns true if the shared informer's store has been
    	// informed by at least one full LIST of the authoritative state
    	// of the informer's object collection.  This is unrelated to "resync".
    	HasSynced() bool
    	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
    	// store. The value returned is not synchronized with access to the underlying store and is not
    	// thread-safe.
    	LastSyncResourceVersion() string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:

    • 索引本地缓存
    • controller,通过list watch拉取API并推入 Deltal FIFO
    • 事件的处理
    type sharedIndexInformer struct {
    	indexer    Indexer // 具有索引的本地缓存
    	controller Controller // controller
    
    	processor             *sharedProcessor // 事件处理函数集合
    	cacheMutationDetector MutationDetector
    
    	listerWatcher ListerWatcher
    	objectType runtime.Object
    	resyncCheckPeriod time.Duration
    	defaultEventHandlerResyncPeriod time.Duration
    	clock clock.Clock
    	started, stopped bool
    	startedLock      sync.Mutex
    	blockDeltas sync.Mutex
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程

    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    
    	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    		KnownObjects:          s.indexer,
    		EmitDeltaTypeReplaced: true,
    	})
    
    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    s.listerWatcher,
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
    
    		Process: s.HandleDeltas, // process 弹出时操作的流程
    	}
    
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()
    
    	// Separate stop channel because Processor should be stopped strictly after controller
    	processorStopCh := make(chan struct{})
    	var wg wait.Group
    	defer wg.Wait()              // Wait for Processor to stop
    	defer close(processorStopCh) // Tell Processor to stop
    	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    	wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数
    
    	defer func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		s.stopped = true // Don't want any new listeners
    	}()
        s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数

    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Replaced, Added, Updated:
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    
    				isSync := false
    				switch {
    				case d.Type == Sync:
    					isSync = true
    				case d.Type == Replaced:
    					if accessor, err := meta.Accessor(d.Object); err == nil {
    						if oldAccessor, err := meta.Accessor(old); err == nil {
    							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
    						}
    					}
    				}
                    // 事件的分发
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
                    // 事件的分发
    				s.processor.distribute(addNotification{newObj: d.Object}, false)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    事件处理函数 processor

    启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数。

    run() 函数会启动两个 listener,j监听事件处理业务函数 listener.run 和 事件的处理

    wg.StartWithChannel(processorStopCh, s.processor.run)
    
    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    	func() {
    		p.listenersLock.RLock()
    		defer p.listenersLock.RUnlock()
    		for _, listener := range p.listeners {
    			p.wg.Start(listener.run) 
    			p.wg.Start(listener.pop)
    		}
    		p.listenersStarted = true
    	}()
    	<-stopCh
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    	for _, listener := range p.listeners {
    		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    	}
    	p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理

    func (p *processorListener) run() {
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		for next := range p.nextCh { // 消费事件
    			switch notification := next.(type) {
    			case updateNotification:
    				p.handler.OnUpdate(notification.oldObj, notification.newObj)
    			case addNotification:
    				p.handler.OnAdd(notification.newObj)
    			case deleteNotification:
    				p.handler.OnDelete(notification.oldObj)
    			default:
    				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
    			}
    		}
    		// the only way to get here is if the p.nextCh is empty and closed
    		close(stopCh)
    	}, 1*time.Second, stopCh)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    informer中的事件的设计

    了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener

    事件的添加

    当在handleDelta时,会分发具体的事件

    // 事件的分发
    s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    
    • 1
    • 2

    此时,事件泵 Pop() 会根据接收到的事件进行处理

    // run() 时会启动一个事件泵
    p.wg.Start(listener.pop)
    
    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) 
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
            case nextCh <- notification: // 这里实际上是一个阻塞的等待
                // 单向channel 可能不会走到这步骤
    			var ok bool
                // deltahandle 中 distribute 会将事件添加到addCh待处理事件中
                // 处理完事件会再次拿到一个事件
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
            // 处理 分发过来的事件 addCh
    		case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
    			if !ok {
    				return
    			}
                // 这里代表第一次,没有任何事件时,或者上面步骤完成读取
    			if notification == nil { // 就会走这里
    				notification = notificationToAdd 
    				nextCh = p.nextCh 
    			} else { 
                    // notification否则代表没有处理完,将数据再次添加到待处理中
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    该消息事件的流程图为

    在这里插入图片描述

    通过一个简单实例来学习client-go中的消息通知机制

    package main
    
    import (
    	"fmt"
    	"time"
    
    	"k8s.io/utils/buffer"
    )
    
    var nextCh1 = make(chan interface{})
    var addCh = make(chan interface{})
    var stopper = make(chan struct{})
    var notification interface{}
    var pendding = *buffer.NewRingGrowing(2)
    
    func main() {
    	// pop
    	go func() {
    		var nextCh chan<- interface{}
    		var notification interface{}
    		//var n int
    		for {
    			fmt.Println("busy wait")
    			fmt.Println("entry select", notification)
    			select {
    			// 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
    			case nextCh <- notification:
    				fmt.Println("entry nextCh", notification)
    				var ok bool
    				// 读不到数据代表已处理完,置空锁
    				notification, ok = pendding.ReadOne()
    				if !ok {
    					fmt.Println("unactive nextch")
    					nextCh = nil
    				}
    			// 事件的分发,监听,初始时也是一个阻塞
    			case notificationToAdd, ok := <-addCh:
    				fmt.Println(notificationToAdd, notification)
    				if !ok {
    					return
    				}
    				// 线程安全
    				// 当消息为空时,没有被处理
    				// 锁为空,就分发数据
    				if notification == nil {
    					fmt.Println("frist notification nil")
    					notification = notificationToAdd
    					nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
    				} else {
    					// 在第三次时,会走到这里,数据进入环
    					fmt.Println("into ring", notificationToAdd)
    					pendding.WriteOne(notificationToAdd)
    				}
    			}
    		}
    	}()
    	// producer
    	go func() {
    		i := 0
    		for {
    			i++
    			if i%5 == 0 {
    				addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
    				time.Sleep(time.Millisecond * 9000)
    			} else {
    				addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
    				time.Sleep(time.Millisecond * 500)
    			}
    		}
    	}()
    	// subsriber
    	go func() {
    		for {
    			for next := range nextCh1 {
    				time.Sleep(time.Millisecond * 300)
    				fmt.Println("consumer", next)
    			}
    		}
    	}()
    	<-stopper
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    总结,这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextChnotification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextChnextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法

  • 相关阅读:
    MFC中关于CMutex类的学习
    【会议资源】2022年第三届自动化科学与工程国际会议(JCASE 2022)
    圣杯布局/双飞翼布局/flex/grid等,实现CSS三栏自适应布局的几种方法
    自学黑客(网络安全),一般人我劝你还是算了吧
    grafana 密码忘记怎么重置
    澳大利亚博士后招聘|皇家墨尔本理工学院材料科学
    设计模式——抽象工厂模式
    超硬核!华为智慧屏上的家庭相册竟可以自动精准分类?
    语音处理——Pyannote使用学习
    【论文阅读】Twin Neural Network Regression
  • 原文地址:https://blog.csdn.net/sinat_24092079/article/details/124975286