• kubernetes event 的内幕


    参考资料:

    kubernetes里的event事件生成机制 - xiaoqing

    Kubernetes · GitHub

    一、概述

    学习背景:

    之前同事问我kubectl get event 这个命令到底是怎么回事,为什么只能拿到一段时间的,这段时间是多久?在这里写下笔记

    kubernetes 中 kubelet 负责维护整个pod的生命周期,当有pod创建、崩溃都会产生日志

    消费

    kubelet 产生日志发送给apiserver,然后apiserver 存储到etcd, 当然只保存 --event-ttl时间的数据。

    当我们使用 

    kubectl get event

     拉取event-ttl时间的event。

    二、kubelet生产event

    总体浏览

    kubelet 通过client-go把event 推送给apiserver,apiserver

    makeEventRecorder:

    makeEventRecorder 创建了 eventBroadcaster 事件广播器

    事件广播器创建之后,又会创建EventWatcher,一个用来生产日志,一个用来生产Event给apiServer,StartStructuredLogging用来生产日志,StartRecordingToSink用来生产事件给apiServer

    1. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
    2. if kubeDeps.Recorder != nil {
    3. return
    4. }
    5. eventBroadcaster := record.NewBroadcaster()
    6. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
    7. eventBroadcaster.StartStructuredLogging(3)
    8. if kubeDeps.EventClient != nil {
    9. klog.V(4).InfoS("Sending events to api server")
    10. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
    11. } else {
    12. klog.InfoS("No api server defined - no events will be sent to API server")
    13. }
    14. }

    StartRecordingToSink具体实现:

    1. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
    2. func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
    3. go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
    4. go wait.Until(e.finishSeries, finishTime, stopCh)
    5. e.startRecordingEvents(stopCh)
    6. }

    StartStructuredLogging具体实现:

    1. // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
    2. // The return value can be ignored or used to stop recording, if desired.
    3. func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
    4. return e.StartEventWatcher(
    5. func(obj runtime.Object) {
    6. event, ok := obj.(*eventsv1.Event)
    7. if !ok {
    8. klog.Errorf("unexpected type, expected eventsv1.Event")
    9. return
    10. }
    11. klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
    12. })
    13. }
    StartEventWatcher 实现:
    1. // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
    2. // The return value can be ignored or used to stop recording, if desired.
    3. func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
    4. watcher, err := e.Watch()
    5. if err != nil {
    6. klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
    7. }
    8. go func() {
    9. defer utilruntime.HandleCrash()
    10. for watchEvent := range watcher.ResultChan() {
    11. event, ok := watchEvent.Object.(*v1.Event)
    12. if !ok {
    13. // This is all local, so there's no reason this should
    14. // ever happen.
    15. continue
    16. }
    17. eventHandler(event)
    18. }
    19. }()
    20. return watcher
    21. }

    EventBroadcaster:

    启动:

    1. // Start events processing pipeline.
    2. c.EventBroadcaster.StartStructuredLogging(0)
    3. c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})

    创建客户端:

    1. // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
    2. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
    3. if kubeDeps.Recorder != nil {
    4. return
    5. }
    6. eventBroadcaster := record.NewBroadcaster()
    7. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
    8. eventBroadcaster.StartStructuredLogging(3)
    9. if kubeDeps.EventClient != nil {
    10. klog.V(4).InfoS("Sending events to api server")
    11. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
    12. } else {
    13. klog.InfoS("No api server defined - no events will be sent to API server")
    14. }
    15. }

    EventBroadcaster 负责把收到的event 分别派发到log 和 apiserver:

    1. func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
    2. ref, err := ref.GetReference(recorder.scheme, object)
    3. if err != nil {
    4. klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
    5. return
    6. }
    7. if !util.ValidateEventType(eventtype) {
    8. klog.Errorf("Unsupported event type: '%v'", eventtype)
    9. return
    10. }
    11. event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
    12. event.Source = recorder.source
    13. // NOTE: events should be a non-blocking operation, but we also need to not
    14. // put this in a goroutine, otherwise we'll race to write to a closed channel
    15. // when we go to shut down this broadcaster. Just drop events if we get overloaded,
    16. // and log an error if that happens (we've configured the broadcaster to drop
    17. // outgoing events anyway).
    18. sent, err := recorder.ActionOrDrop(watch.Added, event)
    19. if err != nil {
    20. klog.Errorf("unable to record event: %v (will not retry!)", err)
    21. return
    22. }
    23. if !sent {
    24. klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
    25. }
    26. }

    具体分发位置位于:

    1. func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
    2. 。。。。。。。。。。。。
    3. // Update can fail because the event may have been removed and it no longer exists.
    4. if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
    5. // Making sure that ResourceVersion is empty on creation
    6. event.ResourceVersion = ""
    7. newEvent, err = sink.Create(event)
    8. }
    9. .................................
    10. }

    sinkCreate 用来继续向下分发,最后分发代码

    1. func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) {
    2. if e.ns != "" && event.Namespace != e.ns {
    3. return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
    4. }
    5. result := &v1.Event{}
    6. err := e.client.Post().
    7. NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
    8. Resource("events").
    9. Body(event).
    10. Do(context.TODO()).
    11. Into(result)
    12. return result, err
    13. }

    kubelet生产事件 是如何做聚合和限速的?如果k8s产生非常多的事件会不会把apiServer打挂掉?

    kubelet做了聚合和限流,并且采用lru算法,只会上报最近默认10分钟的数据

    1. // if we see the same event that varies only by message
    2. // more than 10 times in a 10 minute period, aggregate the event
    3. defaultAggregateMaxEvents = 10
    4. defaultAggregateIntervalInSeconds = 600

    lru 算法聚合

    lru算法不再提了,这个算法是个很简单的算法主要看,lru中的key是如何生成的

    1. // getSpamKey builds unique event key based on source, involvedObject
    2. func getSpamKey(event *v1.Event) string {
    3. return strings.Join([]string{
    4. event.Source.Component,
    5. event.Source.Host,
    6. event.InvolvedObject.Kind,
    7. event.InvolvedObject.Namespace,
    8. event.InvolvedObject.Name,
    9. string(event.InvolvedObject.UID),
    10. event.InvolvedObject.APIVersion,
    11. },
    12. "")
    13. }

    lru聚合关键代码:

    1. // EventAggregate checks if a similar event has been seen according to the
    2. // aggregation configuration (max events, max interval, etc) and returns:
    3. //
    4. // - The (potentially modified) event that should be created
    5. // - The cache key for the event, for correlation purposes. This will be set to
    6. // the full key for normal events, and to the result of
    7. // EventAggregatorMessageFunc for aggregate events.
    8. func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
    9. now := metav1.NewTime(e.clock.Now())
    10. var record aggregateRecord
    11. // eventKey is the full cache key for this event
    12. eventKey := getEventKey(newEvent)
    13. // aggregateKey is for the aggregate event, if one is needed.
    14. aggregateKey, localKey := e.keyFunc(newEvent)
    15. // Do we have a record of similar events in our cache?
    16. e.Lock()
    17. defer e.Unlock()
    18. value, found := e.cache.Get(aggregateKey)
    19. if found {
    20. record = value.(aggregateRecord)
    21. }
    22. // Is the previous record too old? If so, make a fresh one. Note: if we didn't
    23. // find a similar record, its lastTimestamp will be the zero value, so we
    24. // create a new one in that case.
    25. maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
    26. interval := now.Time.Sub(record.lastTimestamp.Time)
    27. if interval > maxInterval {
    28. record = aggregateRecord{localKeys: sets.NewString()}
    29. }
    30. // Write the new event into the aggregation record and put it on the cache
    31. record.localKeys.Insert(localKey)
    32. record.lastTimestamp = now
    33. e.cache.Add(aggregateKey, record)
    34. // If we are not yet over the threshold for unique events, don't correlate them
    35. if uint(record.localKeys.Len()) < e.maxEvents {
    36. return newEvent, eventKey
    37. }
    38. // do not grow our local key set any larger than max
    39. record.localKeys.PopAny()
    40. // create a new aggregate event, and return the aggregateKey as the cache key
    41. // (so that it can be overwritten.)
    42. eventCopy := &v1.Event{
    43. ObjectMeta: metav1.ObjectMeta{
    44. Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
    45. Namespace: newEvent.Namespace,
    46. },
    47. Count: 1,
    48. FirstTimestamp: now,
    49. InvolvedObject: newEvent.InvolvedObject,
    50. LastTimestamp: now,
    51. Message: e.messageFunc(newEvent),
    52. Type: newEvent.Type,
    53. Reason: newEvent.Reason,
    54. Source: newEvent.Source,
    55. }
    56. return eventCopy, aggregateKey
    57. }

    lru算法看完了,我们看一下kubelet中event是如何限速的

    event限速

    限速具体位置: 

    1. // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
    2. func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
    3. if newEvent == nil {
    4. return nil, fmt.Errorf("event is nil")
    5. }
    6. aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
    7. observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
    8. if c.filterFunc(observedEvent) {
    9. return &EventCorrelateResult{Skip: true}, nil
    10. }
    11. return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
    12. }

    k8s官方使用的包是:

    "golang.org/x/time/rate"

    令牌桶算法调用:

    1. // Filter controls that a given source+object are not exceeding the allowed rate.
    2. func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
    3. var record spamRecord
    4. // controls our cached information about this event
    5. eventKey := f.spamKeyFunc(event)
    6. // do we have a record of similar events in our cache?
    7. f.Lock()
    8. defer f.Unlock()
    9. value, found := f.cache.Get(eventKey)
    10. if found {
    11. record = value.(spamRecord)
    12. }
    13. // verify we have a rate limiter for this record
    14. if record.rateLimiter == nil {
    15. record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
    16. }
    17. // ensure we have available rate
    18. filter := !record.rateLimiter.TryAccept()
    19. // update the cache
    20. f.cache.Add(eventKey, record)
    21. return filter
    22. }

    1. // NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
    2. // except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
    3. // and uses a PassiveClock.
    4. func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
    5. limiter := rate.NewLimiter(rate.Limit(qps), burst)
    6. return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
    7. }

    核心调用:

    1. func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
    2. return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
    3. }

    如果有令牌桶不了解的可以看:

    GitHub - juju/ratelimit: Efficient token-bucket-based rate limiter package.

    三、apiserver 存储event到etcd

    实现代码:

    kubernetes/apiserver.go at master · kubernetes/kubernetes · GitHub

    1. // CreateKubeAPIServer creates and wires a workable kube-apiserver
    2. func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
    3. kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    4. if err != nil {
    5. return nil, err
    6. }
    7. return kubeAPIServer, nil
    8. }

    kubernetes/storage_core.go at master · kubernetes/kubernetes · GitHub

    存储event-ttl 时间的数据

    1. func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    2. ............................................
    3. eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    4. if err != nil {
    5. return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    6. }
    7. limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
    8. if err != nil {
    9. return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    10. }
    11. ..........................................
    12. return restStorage, apiGroupInfo, nil
    13. }

  • 相关阅读:
    基于Simulink的用于电力系统动态分析
    cordon节点,drain驱逐节点,delete 节点
    老测试/开发程序员给年轻程序员的一些建议,小码农搬砖工人......
    知识管理,助力员工培训体系构建
    VR全景云端看车,让你享受不一样的购车体验
    【附源码】计算机毕业设计JAVA研究生招生信息管理
    公钥密码和中国剩余定理
    Jmeter结构体系——Jmeter目录结构详解
    【每日一题Day328】LC198打家劫舍 | 动态规划
    ES6中的原型对象
  • 原文地址:https://blog.csdn.net/qq_32783703/article/details/127379653