• 【博客483】prometheus-----告警处理源码剖析


    prometheus-----告警处理源码剖析

    一条告警在prometheus中的三种状态切换

    在这里插入图片描述

    prometheus常见参数

    # 数据采集间隔
    scrape_interval: 15s 
    
    # 评估告警周期
    evaluation_interval: 15s 
    
    # 数据采集超时时间默认10s
    scrape_timeout: 10s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    prometheus对恢复的告警会在内存保存15分钟,期间持续发送给alertmanager(hard code)

    // 对于恢复的告警,会在内存保存15分钟后才会删除,这期间会一直重复的发送给alertmanger
    // resolvedRetention is the duration for which a resolved alert instance
    // is kept in memory state and consequently repeatedly sent to the AlertManager.
    const resolvedRetention = 15 * time.Minute
    
    • 1
    • 2
    • 3
    • 4

    正篇:告警评估与处理

    1、prometheus启动一个协程定期去评估所有告警规则是否触发,并对触发的告警规则发送告警

    规则组启动流程(Group.run):进入 Group.run 方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为 g.interval;然后定义规则运算调度方法:iter,调度周期为 g.interval;在 iter 方法中调用 g.Eval 方法执行下一层次的规则运算调度。

    规则运算的调度周期:

    规则运算的调度周期 g.interval,由 prometheus.yml 配置文件中 global 中的:
    [ evaluation_interval: | default = 1m ]指定。
    prometheus会有一个协程,每隔g.interval执行一次来判断是否有告警产生:

    func (g *Group) run(ctx context.Context) {
    	defer close(g.terminated)
    
        // 告警评估函数
    	iter := func() {
    		g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
    
    		start := time.Now()
    		g.Eval(ctx, evalTimestamp)
    		timeSinceStart := time.Since(start)
    
    		g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
    		g.setEvaluationTime(timeSinceStart)
    		g.setLastEvaluation(start)
    	}
    
    	// The assumption here is that since the ticker was started after having
    	// waited for `evalTimestamp` to pass, the ticks will trigger soon
    	// after each `evalTimestamp + N * g.interval` occurrence.
                    // 告警评估定时器,周期为g.interval
    	tick := time.NewTicker(g.interval)
    	defer tick.Stop()
                    ...
                    ...
    	for {
    		select {
    		case <-g.done:
    			return
    		default:
    			select {
    			case <-g.done:
    				return
    			case <-tick.C:
    				missed := (time.Since(evalTimestamp) / g.interval) - 1
    				if missed > 0 {
    					g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
    					g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
    				}
    				evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
    
    				useRuleGroupPostProcessFunc(g, evalTimestamp.Add(-(missed+1)*g.interval))
                    // 执行告警评估
    				iter()
    			}
    		}
    	}
    }
    
    // 告警评估函数
    // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
    func (g *Group) Eval(ctx context.Context, ts time.Time) {
    	var samplesTotal float64
        // 对每条规则进行评估
    	for i, rule := range g.rules {
    		select {
    		case <-g.done:
    			return
    		default:
    		}
    
    		func(i int, rule Rule) {
    			ctx, sp := otel.Tracer("").Start(ctx, "rule")
    			sp.SetAttributes(attribute.String("name", rule.Name()))
    			defer func(t time.Time) {
    				sp.End()
    
    				since := time.Since(t)
    				g.metrics.EvalDuration.Observe(since.Seconds())
    				rule.SetEvaluationDuration(since)
    				rule.SetEvaluationTimestamp(t)
    			}(time.Now())
    
    			g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
               // 这里是执行对每条规则评估函数
    			vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
    			if err != nil {
    				rule.SetHealth(HealthBad)
    				rule.SetLastError(err)
    				sp.SetStatus(codes.Error, err.Error())
    				g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
    
    				// Canceled queries are intentional termination of queries. This normally
    				// happens on shutdown and thus we skip logging of any errors here.
    				var eqc promql.ErrQueryCanceled
    				if !errors.As(err, &eqc) {
    					level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Evaluating rule failed", "rule", rule, "err", err)
    				}
    				return
    			}
    			rule.SetHealth(HealthGood)
    			rule.SetLastError(nil)
    			samplesTotal += float64(len(vector))
    
    			if ar, ok := rule.(*AlertingRule); ok {
                    // 这里是告警规则发送函数,发送评估后需要发送的告警
    				ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
    			}
                ...
                ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    2、对每条告警规则的评估方法

    // Eval evaluates the rule expression and then creates pending alerts and fires
    // or removes previously pending alerts accordingly.
    // 处理告警
    func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
    	res, err := query(ctx, r.vector.String(), ts)
    	if err != nil {
    		return nil, err
    	}
    
    	// Create pending alerts for any new vector elements in the alert expression
    	// or update the expression value for existing elements.
    	resultFPs := map[uint64]struct{}{}
    
    	var vec promql.Vector
    	alerts := make(map[uint64]*Alert, len(res))
    	for _, smpl := range res {
    		// Provide the alert information to the template.
    		l := make(map[string]string, len(smpl.Metric))
    		for _, lbl := range smpl.Metric {
    			l[lbl.Name] = lbl.Value
    		}
    
    		tmplData := template.AlertTemplateData(l, r.externalLabels, r.externalURL, smpl.V)
    		// Inject some convenience variables that are easier to remember for users
    		// who are not used to Go's templating system.
    		defs := []string{
    			"{{$labels := .Labels}}",
    			"{{$externalLabels := .ExternalLabels}}",
    			"{{$externalURL := .ExternalURL}}",
    			"{{$value := .Value}}",
    		}
    
    		expand := func(text string) string {
    			tmpl := template.NewTemplateExpander(
    				ctx,
    				strings.Join(append(defs, text), ""),
    				"__alert_"+r.Name(),
    				tmplData,
    				model.Time(timestamp.FromTime(ts)),
    				template.QueryFunc(query),
    				externalURL,
    				nil,
    			)
    			result, err := tmpl.Expand()
    			if err != nil {
    				result = fmt.Sprintf("", err)
    				level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData)
    			}
    			return result
    		}
    
    		lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName)
    
    		for _, l := range r.labels {
    			lb.Set(l.Name, expand(l.Value))
    		}
    		lb.Set(labels.AlertName, r.Name())
    
    		annotations := make(labels.Labels, 0, len(r.annotations))
    		for _, a := range r.annotations {
    			annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)})
    		}
    
    		lbs := lb.Labels(nil)
    		h := lbs.Hash()
    		resultFPs[h] = struct{}{}
    
    		if _, ok := alerts[h]; ok {
    			return nil, fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
    		}
    
    		alerts[h] = &Alert{
    			Labels:      lbs,
    			Annotations: annotations,
    			ActiveAt:    ts,
    			State:       StatePending,
    			Value:       smpl.V,
    		}
    	}
    
    	r.activeMtx.Lock()
    	defer r.activeMtx.Unlock()
                   
                    // 对于发生的告警,检测是否在当前的活跃告警集合中存在,且状态不等于非活跃,则更新告警的value和annotation
    	for h, a := range alerts {
    		// Check whether we already have alerting state for the identifying label set.
    		// Update the last value and annotations if so, create a new alert entry otherwise.
    		if alert, ok := r.active[h]; ok && alert.State != StateInactive {
    			alert.Value = a.Value
    			alert.Annotations = a.Annotations
    			continue
    		}
    
    		r.active[h] = a
    	}
    
    	var numActivePending int
    	// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
              
    	for fp, a := range r.active {
    		if _, ok := resultFPs[fp]; !ok {
                                         
    			// If the alert was previously firing, keep it around for a given
    			// retention time so it is reported as resolved to the AlertManager.
                                                    // 对于当前每条活跃的告警,判断是否在本次的告警中,如果不在的话,则证明此条活跃的告警的指标已经恢复了
                                                    // 对于已经恢复的告警指标,如果之前是pending或者之前的ResolvedAt非空,且在resolvedRetention(15m)之前的,则删除此告警
                                                    // 因为已经没必要发送通知了
    			if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
    				delete(r.active, fp)
    			}
                                                    // 否则更新告警的状态为恢复,且恢复的时间为当前时间
    			if a.State != StateInactive {
    				a.State = StateInactive
    				a.ResolvedAt = ts
    			}
    			continue
    		}
    		numActivePending++
                                    // 对于当前每条活跃的告警,也在本次的告警中,且之前的状态是pending,且持续的时间大于holdDuration(也就是告警规则里for的时间)
                                    // 更新状态为触发,触发时间为当前
    		if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
    			a.State = StateFiring
    			a.FiredAt = ts
    		}
    
    		if r.restored.Load() {
    			vec = append(vec, r.sample(a, ts))
    			vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
    		}
    	}
    
    	if limit > 0 && numActivePending > limit {
    		r.active = map[uint64]*Alert{}
    		return nil, fmt.Errorf("exceeded limit of %d with %d alerts", limit, numActivePending)
    	}
    
    	return vec, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138

    3、评估后,对触发的告警判断是否需要发送

    // 发送告警
    func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay, interval time.Duration, notifyFunc NotifyFunc) {
    	alerts := []*Alert{}
    	r.ForEachActiveAlert(func(alert *Alert) {
            // 判断是否需要发送
    		if alert.needsSending(ts, resendDelay) {
    			alert.LastSentAt = ts
    			// Allow for two Eval or Alertmanager send failures.
    			delta := resendDelay
    			if interval > resendDelay {
    				delta = interval
    			}
                // ValidUntil 字段是一个预估的告警有效时间,超过这个时间点告警会被认为已经解除
                // ValidUntil = ts + max([check_interval], [resend_delay]) * 4
                // resendDelay,是程序启动参数 --rules.alert.resend-delay 规定的,默认 1m
                // interval,是我们配置的采集间隔
    			alert.ValidUntil = ts.Add(4 * delta)
    			anew := *alert
    			alerts = append(alerts, &anew)
    		}
    	})
    	notifyFunc(ctx, r.vector.String(), alerts...)
    }
    
    // 判断一条规则是否需要发送
    func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
        // 状态是pending则不需要
    	if a.State == StatePending {
    		return false
    	}
        // 恢复的时间是大于上一次发送告警的时间,证明恢复是在告警后发生的,那么已经恢复了,需要发送恢复
    	// if an alert has been resolved since the last send, resend it
    	if a.ResolvedAt.After(a.LastSentAt) {
    		return true
    	}
        // 距离上一次发送的时间是否大于1分钟,否则不发送,避免频繁发送
    	return a.LastSentAt.Add(resendDelay).Before(ts)
    }
    
    // sendAlerts implements the rules.NotifyFunc for a Notifier.
    func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
    	return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
    		var res []*notifier.Alert
    
    		for _, alert := range alerts {
    			a := ¬ifier.Alert{
    				StartsAt:     alert.FiredAt,
    				Labels:       alert.Labels,
    				Annotations:  alert.Annotations,
    				GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
    			}
                // 如果告警的ResolvedAt不空,则EndsAt = .ResolvedAt,否则等于ValidUntil
                // 也就是说如果告警的ResolvedAt不空,证明是采集到了恢复的情况,EndAt代表实际的恢复实际
                // 如果告警的ResolvedAt为空,则还没有恢复,那么设置上一个ValidUntil,就是告警的有效时间,
                // 就是说如果持续了ValidUntil之后,没有收到新的firing,则当作恢复来处理
    			if !alert.ResolvedAt.IsZero() {
    				a.EndsAt = alert.ResolvedAt
    			} else {
    				a.EndsAt = alert.ValidUntil
    			}
    			res = append(res, a)
    		}
    
    		if len(alerts) > 0 {
    			s.Send(res...)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    总结:

    1、Prometheus配置的’scrape_interval’定义的时间间隔,定期采集目标主机上监控数据,每次采集的超时时间为scrape_timeout

    2、 Prometheus同时根据配置的"evaluation_interval"的时间间隔,定期(默认1min)的对Alert Rule进行评估,当到达评估周期的时候,发现触发告警规则,激活Alert

          具体步骤如下:
          * 对于已经发送的告警,之前没有在活跃告警集合里面的,加入活跃告警集合
          * 对于已经发送的告警,之前在活跃告警集合里面的,更新value和annotation
          * 对于当前每条活跃的告警,判断是否在本次的告警中,
            如果不在的话,则证明此条活跃的告警的指标已经恢复了
          * 对于已经恢复的告警指标,如果之前是pending或者之前的ResolvedAt非空,且
            在resolvedRetention(15m)之前的,则删除此告警;
            否则更新告警的状态为恢复,且恢复的时间为当前时间
          * 对于当前每条活跃的告警,同时也在本次的告警中,且之前的状态是pending,
            且持续的时间大于holdDuration(也就是告警规则里for的时间);
            更新状态为触发,触发时间为当前
         * 对告警进行判断是否需要发送  
               对于pending的不需要发送
               距离上一次发送的时间是否大于1分钟,否则不发送,避免频繁发送
               恢复时间是大于上次发送告警的时间,证明恢复是在告警后发生的,那么已经恢复了,需发送恢复
          * 设置告警的ValidUntil,如果这条告警过了ValidUntil的话,还没收到新的firing,则代表恢复:
             ValidUntil = ts + max([check_interval], [resend_delay]) * 4
          * 发送前设置告警EndAt
             如果告警的ResolvedAt不空,则EndsAt = ResolvedAt,否则等于ValidUntil
             也就是说如果告警的ResolvedAt不空,证明是采集到了恢复的情况,EndAt代表实际的恢复实际
              如果告警的ResolvedAt为空,则还没有恢复,设置其为一个ValidUntil,就是告警的有效时间,
              就是说如果持续了ValidUntil之后,没有收到新的firing,则当作恢复来处理
          * 发送告警
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    定时评估,带来的告警的发送延迟:

    evaluation_interva默认为1分钟,也就是说你假设for为30s,那你最快也要判断pending+firing两个周期,默认需要2分钟后才能激活告警,并发给alertmanager
    假如不配置for或for设为0,则alert被激活后会立即变为firing状态,同时发送相关报警信息给alertmanager。
    只有在评估周期期间,警报才会从当前状态转移到另一个状态。

    3、当下一个alert rule的评估周期到来的时候,发现告警为真,然后判断警报Active的时间是否已经超出rule里的‘for’ 持续时间,如果未超出,则进入下一个评估周期;
    如果时间超出,则alert的状态变为“FIRING”;同时调用Alertmanager接口,发送相关报警数据。

    4、startsAt和endsAt

    这两个字段,这两个字段分别表示告警的起始时间和终止时间,不过两个字段都是可选的。当AlertManager收到告警实例之后,会分以下几类情况对这两个字段进行处理:

    1、两者都存在:不做处理
    2、两者都未指定:startsAt指定为当前时间,endsAt为当前时间加上告警持续时间,默认为5分钟
    3、只指定startsAt:endsAt指定为当前时间加上默认的告警持续时间
    4、只指定endsAt:将startsAt设置为endsAt
    
    即:如果 endsAt 没有提供,则自动等于 startsAt + resolve_timeout(默认 5m)
    
    AlertManager一般以当前时间和告警实例的endsAt字段进行比较用以判断告警的状态:
    * 若当前时间位于endsAt之前,则表示告警仍然处于触发状态(firing)
    * 若当前时间位于endsAt之后,则表示告警已经消除(resolved)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    告警时间线的合并压缩:

    另外,当Prometheus Server中配置的告警规则被持续满足时,默认对于一条告警,每隔一分钟发一次。
    显然,这些实例除了startsAt和endsAt字段以外都完全相同
    (其实Prometheus Server会将所有实例的startsAt设置为告警第一次被触发的时间)。
    最终,这些实例都会进行压缩去重,多条最终labels相同的告警最终被压缩聚合为一条告警。
    当我们进行查询时,只会得到一条起始时间为最开始第一条告警通知的开始时间,
    结束时间为最后一条告警通知的恢复时间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    为什么一条持续触发的告警不会触发恢复,而采集不到数据时会触发恢复

    如果告警一直 Firing,那么 Prometheus 会在 resend_delay 的间隔重复发送,
    而 startsAt 保持不变, endsAt 跟着 ValidUntil 变。
    这也就是为啥一直firing的规则不会被认为恢复,而不发firting则会认为恢复。
    因为一直firing的告警消息中, endsAt 跟着 ValidUntil 变,一直在后延。
    而如果没收到,就会导致alertmanger那边在过了告警的endAt时间后,
    没收到恢复或者新firing,则认为恢复
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意:Alertmanager 里必须有 Inactive 消息所对应的告警,否则是会被忽略的。换句话说如果一个告警在 Alertmanager 里已经解除了,再发同样的 Inactive 消息,Alertmanager 是不会发给 webhook 的。

    Prometheus 需要 持续 地将 Firing 告警发送给 Alertmanager,遇到以下一种情况,
    Alertmanager 会认为告警已经解决,发送一个 resolved:

    * Prometheus 发送了 Inactive 的消息给 Alertmanager,即 endsAt=当前时间
    * Prometheus 在上一次消息的 endsAt 之前,一直没有发送任何消息给 Alertmanager
    
    • 1
    • 2

    alertmanager的resolve_timeout对prometheus无效

    alertmanager的resolve_timeout是指多久没收到新firing则认为一条老的告警已经恢复,这个选项对prometheus无效,因为prometheus的消息一直都会带着endAt

  • 相关阅读:
    资深Java面试题及答案(汇总)
    线程的创建方式
    字节跳动开源隐私合规检测工具appshark
    Java毕设项目——书画拍卖网站(java+SSM+Maven+Mysql+Jsp)
    蛋白纯化-实验设计
    存储过程浅入深出
    Linux多线程【线程控制】
    人体神经系统结构图高清,人体神经系统全貌图片
    4、设计模式之单例设计模式
    《视觉SLAM十四讲》公式推导(一)
  • 原文地址:https://blog.csdn.net/qq_43684922/article/details/126803304