参考资料:
kubernetes里的event事件生成机制 - xiaoqing
学习背景:
之前同事问我kubectl get event 这个命令到底是怎么回事,为什么只能拿到一段时间的,这段时间是多久?在这里写下笔记
kubernetes 中 kubelet 负责维护整个pod的生命周期,当有pod创建、崩溃都会产生日志
消费
kubelet 产生日志发送给apiserver,然后apiserver 存储到etcd, 当然只保存 --event-ttl时间的数据。
当我们使用
kubectl get event
拉取event-ttl时间的event。
总体浏览

kubelet 通过client-go把event 推送给apiserver,apiserver
makeEventRecorder 创建了 eventBroadcaster 事件广播器
事件广播器创建之后,又会创建EventWatcher,一个用来生产日志,一个用来生产Event给apiServer,StartStructuredLogging用来生产日志,StartRecordingToSink用来生产事件给apiServer
- func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
- if kubeDeps.Recorder != nil {
- return
- }
- eventBroadcaster := record.NewBroadcaster()
- kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
- eventBroadcaster.StartStructuredLogging(3)
- if kubeDeps.EventClient != nil {
- klog.V(4).InfoS("Sending events to api server")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
- } else {
- klog.InfoS("No api server defined - no events will be sent to API server")
- }
- }
StartRecordingToSink具体实现:
- // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
- func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
- go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
- go wait.Until(e.finishSeries, finishTime, stopCh)
- e.startRecordingEvents(stopCh)
- }
StartStructuredLogging具体实现:
- // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
- // The return value can be ignored or used to stop recording, if desired.
- func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
- return e.StartEventWatcher(
- func(obj runtime.Object) {
- event, ok := obj.(*eventsv1.Event)
- if !ok {
- klog.Errorf("unexpected type, expected eventsv1.Event")
- return
- }
- klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
- })
- }
StartEventWatcher 实现:
- // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
- // The return value can be ignored or used to stop recording, if desired.
- func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
- watcher, err := e.Watch()
- if err != nil {
- klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
- }
- go func() {
- defer utilruntime.HandleCrash()
- for watchEvent := range watcher.ResultChan() {
- event, ok := watchEvent.Object.(*v1.Event)
- if !ok {
- // This is all local, so there's no reason this should
- // ever happen.
- continue
- }
- eventHandler(event)
- }
- }()
- return watcher
- }
启动:
- // Start events processing pipeline.
- c.EventBroadcaster.StartStructuredLogging(0)
- c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
创建客户端:
- // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
- func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
- if kubeDeps.Recorder != nil {
- return
- }
- eventBroadcaster := record.NewBroadcaster()
- kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
- eventBroadcaster.StartStructuredLogging(3)
- if kubeDeps.EventClient != nil {
- klog.V(4).InfoS("Sending events to api server")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
- } else {
- klog.InfoS("No api server defined - no events will be sent to API server")
- }
- }
EventBroadcaster 负责把收到的event 分别派发到log 和 apiserver:
- func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
- ref, err := ref.GetReference(recorder.scheme, object)
- if err != nil {
- klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
- return
- }
-
- if !util.ValidateEventType(eventtype) {
- klog.Errorf("Unsupported event type: '%v'", eventtype)
- return
- }
-
- event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
- event.Source = recorder.source
-
- // NOTE: events should be a non-blocking operation, but we also need to not
- // put this in a goroutine, otherwise we'll race to write to a closed channel
- // when we go to shut down this broadcaster. Just drop events if we get overloaded,
- // and log an error if that happens (we've configured the broadcaster to drop
- // outgoing events anyway).
- sent, err := recorder.ActionOrDrop(watch.Added, event)
- if err != nil {
- klog.Errorf("unable to record event: %v (will not retry!)", err)
- return
- }
- if !sent {
- klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
- }
- }
具体分发位置位于:
- func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
- 。。。。。。。。。。。。
- // Update can fail because the event may have been removed and it no longer exists.
- if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
- // Making sure that ResourceVersion is empty on creation
- event.ResourceVersion = ""
- newEvent, err = sink.Create(event)
- }
- .................................
- }
sinkCreate 用来继续向下分发,最后分发代码
- func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) {
- if e.ns != "" && event.Namespace != e.ns {
- return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
- }
- result := &v1.Event{}
- err := e.client.Post().
- NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
- Resource("events").
- Body(event).
- Do(context.TODO()).
- Into(result)
- return result, err
- }
kubelet生产事件 是如何做聚合和限速的?如果k8s产生非常多的事件会不会把apiServer打挂掉?
kubelet做了聚合和限流,并且采用lru算法,只会上报最近默认10分钟的数据
- // if we see the same event that varies only by message
- // more than 10 times in a 10 minute period, aggregate the event
- defaultAggregateMaxEvents = 10
- defaultAggregateIntervalInSeconds = 600
lru算法不再提了,这个算法是个很简单的算法主要看,lru中的key是如何生成的
- // getSpamKey builds unique event key based on source, involvedObject
- func getSpamKey(event *v1.Event) string {
- return strings.Join([]string{
- event.Source.Component,
- event.Source.Host,
- event.InvolvedObject.Kind,
- event.InvolvedObject.Namespace,
- event.InvolvedObject.Name,
- string(event.InvolvedObject.UID),
- event.InvolvedObject.APIVersion,
- },
- "")
- }
lru聚合关键代码:
- // EventAggregate checks if a similar event has been seen according to the
- // aggregation configuration (max events, max interval, etc) and returns:
- //
- // - The (potentially modified) event that should be created
- // - The cache key for the event, for correlation purposes. This will be set to
- // the full key for normal events, and to the result of
- // EventAggregatorMessageFunc for aggregate events.
- func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
- now := metav1.NewTime(e.clock.Now())
- var record aggregateRecord
- // eventKey is the full cache key for this event
- eventKey := getEventKey(newEvent)
- // aggregateKey is for the aggregate event, if one is needed.
- aggregateKey, localKey := e.keyFunc(newEvent)
-
- // Do we have a record of similar events in our cache?
- e.Lock()
- defer e.Unlock()
- value, found := e.cache.Get(aggregateKey)
- if found {
- record = value.(aggregateRecord)
- }
-
- // Is the previous record too old? If so, make a fresh one. Note: if we didn't
- // find a similar record, its lastTimestamp will be the zero value, so we
- // create a new one in that case.
- maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
- interval := now.Time.Sub(record.lastTimestamp.Time)
- if interval > maxInterval {
- record = aggregateRecord{localKeys: sets.NewString()}
- }
- // Write the new event into the aggregation record and put it on the cache
- record.localKeys.Insert(localKey)
- record.lastTimestamp = now
- e.cache.Add(aggregateKey, record)
- // If we are not yet over the threshold for unique events, don't correlate them
- if uint(record.localKeys.Len()) < e.maxEvents {
- return newEvent, eventKey
- }
-
- // do not grow our local key set any larger than max
- record.localKeys.PopAny()
-
- // create a new aggregate event, and return the aggregateKey as the cache key
- // (so that it can be overwritten.)
- eventCopy := &v1.Event{
- ObjectMeta: metav1.ObjectMeta{
- Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
- Namespace: newEvent.Namespace,
- },
- Count: 1,
- FirstTimestamp: now,
- InvolvedObject: newEvent.InvolvedObject,
- LastTimestamp: now,
- Message: e.messageFunc(newEvent),
- Type: newEvent.Type,
- Reason: newEvent.Reason,
- Source: newEvent.Source,
- }
- return eventCopy, aggregateKey
- }
lru算法看完了,我们看一下kubelet中event是如何限速的
限速具体位置:
- // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
- func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
- if newEvent == nil {
- return nil, fmt.Errorf("event is nil")
- }
- aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
- observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
- if c.filterFunc(observedEvent) {
- return &EventCorrelateResult{Skip: true}, nil
- }
- return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
- }
k8s官方使用的包是:
"golang.org/x/time/rate"
令牌桶算法调用:
- // Filter controls that a given source+object are not exceeding the allowed rate.
- func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
- var record spamRecord
-
- // controls our cached information about this event
- eventKey := f.spamKeyFunc(event)
-
- // do we have a record of similar events in our cache?
- f.Lock()
- defer f.Unlock()
- value, found := f.cache.Get(eventKey)
- if found {
- record = value.(spamRecord)
- }
-
- // verify we have a rate limiter for this record
- if record.rateLimiter == nil {
- record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
- }
-
- // ensure we have available rate
- filter := !record.rateLimiter.TryAccept()
-
- // update the cache
- f.cache.Add(eventKey, record)
-
- return filter
- }
- // NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
- // except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
- // and uses a PassiveClock.
- func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
- limiter := rate.NewLimiter(rate.Limit(qps), burst)
- return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
- }
核心调用:
- func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
- return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
- }
如果有令牌桶不了解的可以看:
GitHub - juju/ratelimit: Efficient token-bucket-based rate limiter package.
实现代码:
kubernetes/apiserver.go at master · kubernetes/kubernetes · GitHub
- // CreateKubeAPIServer creates and wires a workable kube-apiserver
- func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
- kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
- if err != nil {
- return nil, err
- }
-
- return kubeAPIServer, nil
- }
kubernetes/storage_core.go at master · kubernetes/kubernetes · GitHub
存储event-ttl 时间的数据
- func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
- ............................................
-
- eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
- if err != nil {
- return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
- }
- limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
- if err != nil {
- return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
- }
- ..........................................
-
- return restStorage, apiGroupInfo, nil
- }