alertmanager集群莫名发送resolve消息的问题探究
术语
- 告警消息:指一条告警
- 告警恢复消息:指一条告警恢复
- 告警信息:指告警相关的内容,包括告警消息和告警恢复消息
问题描述
最近遇到了一个alertmanager HA集群莫名发送告警恢复消息的问题。简单来说就是线上配置了一个一直会产生告警的规则,但却会收到alertmanager发来的告警恢复消息,与预期不符。
所使用的告警架构如下,vmalert产生的告警会通过LB发送到某个后端alertmanager实例。原本以为,接收到该告警的alertmanager会将告警信息同步到其他实例,当vmalert产生下一个相同的告警后,则alertmanager实例中的第二个告警会刷新第一个告警,后续通过告警同步将最新的告警发送到各个alertmanager实例,从而达到抑制告警和抑制告警恢复的效果(。
但在实际中发现,alertmanager对一直产生的告警发出了告警恢复消息。
问题解决
问题解决办法很简单:让告警直接发送到alertmanager HA集群的每个实例即可。
在Question regarding Loadbalanced Alertmanager Clusters和Alerting issues with Alertmanage这两篇文档中描述了使用LB导致alertmanager HA集群发生告警混乱的问题。此外在官方文档也有如下提示:
It's important not to load balance traffic between Prometheus and its Alertmanagers, but instead, point Prometheus to a list of all Alertmanagers.
但根因是什么,网上找了很久没有找到原因。上述文档描述也摸棱两可。
问题根因
首先上一张alertmanager官方架构图:
注意到上图有三类provider:
- Alert Provider:负责处理通过API传入的告警,vmalert(Prometheus)产生的告警就是在这里接收处理的
- Silence Provider:负责处理静默规则,本次不涉及告警静默,因此不作讨论。
- Notify Provider:负责实例间发送告警信息。
根据如上分析,可以得出,一个alertmanager实例有两种途径获得告警信息,一种是由外部服务(如vmalert、Prometheus等)通过API传入的,另一种是通过alertmanager 实例间的Notification Logs消息获得的。
注:需要说明的是,alertmanager判定一个告警是不是恢复状态,主要是通过该告警的
EndsAt
字段,如果EndsAt
时间点早于当前时间,说明该告警已经失效,需要发送告警恢复,判断代码如下:
// Resolved returns true iff the activity interval ended in the past. func (a *Alert) Resolved() bool { return a.ResolvedAt(time.Now()) } // ResolvedAt returns true off the activity interval ended before // the given timestamp. func (a *Alert) ResolvedAt(ts time.Time) bool { if a.EndsAt.IsZero() { return false } return !a.EndsAt.After(ts) }
API Provider的处理
alertmanager提供了两套API:v1和v2。但两个API内部处理还是一样的逻辑,以v1 API为例,
入口函数为insertAlerts
,该函数主要负责告警的有效性校验,处理告警的StartAt
和EndAt
,最后通过Put
方法将告警保存起来。
本案例场景中,vmalert会给所有告警加上
EndAt
,值为:当前时间 + 4倍的groupInterval(默认1min) = 4min。
func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {
now := time.Now()
api.mtx.RLock()
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()
for _, alert := range alerts {
alert.UpdatedAt = now
// Ensure StartsAt is set.
if alert.StartsAt.IsZero() {
if alert.EndsAt.IsZero() {
alert.StartsAt = now
} else {
alert.StartsAt = alert.EndsAt
}
}
// If no end time is defined, set a timeout after which an alert
// is marked resolved if it is not updated.
if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
api.m.Firing().Inc()
} else {
api.m.Resolved().Inc()
}
}
// Make a best effort to insert all alerts that are valid.
var (
validAlerts = make([]*types.Alert, 0, len(alerts))
validationErrs = &types.MultiError{}
)
for _, a := range alerts {
removeEmptyLabels(a.Labels)
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
if validationErrs.Len() > 0 {
api.respondError(w, apiError{
typ: errorBadData,
err: validationErrs,
}, nil)
return
}
api.respond(w, nil)
}
Put
函数中会对相同指纹的告警进行Merge
,这一步会刷新保存的对应告警的StartAt
和EndAt
,通过这种方式可以保证告警的StartAt
和EndAt
可以随最新接收到的告警消息而更新。
func (a *Alerts) Put(alerts ...*types.Alert) error {
for _, alert := range alerts {
fp := alert.Fingerprint()
existing := false
// Check that there's an alert existing within the store before
// trying to merge.
if old, err := a.alerts.Get(fp); err == nil {
existing = true
// Merge alerts if there is an overlap in activity range.
// 更新告警的StartAt和EndAt字段
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}
if err := a.callback.PreStore(alert, existing); err != nil {
level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err)
continue
}
if err := a.alerts.Set(alert); err != nil {
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}
a.callback.PostStore(alert, existing)
// 将告警分发给订阅者
a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()
}
return nil
}
根据上述分析可以得出,当通过API获取到相同(指纹)的告警时,会更新本实例对应的告警信息(StartAt
和EndAt
),因此如果通过API不停向一个alertmanager实例发送告警,则该实例并不会产生告警恢复消息。
下一步就是要确定,通过API接收到的告警信息是如何发送给其他实例的,以及发送的是哪些信息。
从官方架构图上可以看出,API接收到的告警会进入Dispatcher
,然后进入Notification Pipeline
,最后通过Notification Provider
将告警信息发送给其他实例。
Dispatcher的处理
在上面Put
函数的最后,会将Merge
后的告警发送给a.listeners
,每个listener
对应一个告警订阅者,Dispatcher
算是一个告警订阅者。
要获取从API 收到的告警,首先要进行订阅。订阅函数如下,其实就是在listeners
新增了一个channel,该channel中会预先填充已有的告警,当通过API接收到新告警后,会使用Put()
方法将新的告警分发给各个订阅者。
// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
)
for _, a := range alerts {
ch <- a
}
a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
a.next++
return provider.NewAlertIterator(ch, done, nil)
}
alertmanager的main()
函数中会初始化并启动一个Dispatcher
:
// Run starts dispatching alerts incoming via the updates channel.
func (d *Dispatcher) Run() {
d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}
Dispatcher
启动之后会订阅告警消息:
// Run starts dispatching alerts incoming via the updates channel.
func (d *Dispatcher) Run() {
...
// 订阅告警消息
d.run(d.alerts.Subscribe())
close(d.done)
}
下面是Dispatcher
的主函数,负责接收订阅的channel中传过来的告警,并根据路由(route)处理告警消息(processAlert
)。
func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()
defer it.Close()
for {
select {
// 处理订阅的告警消息
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}
level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-cleanup.C:
d.mtx.Lock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
d.mtx.Unlock()
case <-d.ctx.Done():
return
}
}
}
processAlert
主要是做聚合分组的,ag.run
函数会填充相关的告警信息,并根据GroupWait
和GroupInterval
发送本实例非恢复的告警。
从alertmanager的架构图中可以看到,在Dispatcher
聚合分组告警之后,会将告警送到Notification Pipeline
进行处理,Notification Pipeline
的处理对应ag.run
的入参回调函数。该回调函数中会调用stage.Exec
来处理Notification Pipeline
的各个阶段。
// processAlert determines in which aggregation group the alert falls
// and inserts it.
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()
d.mtx.Lock()
defer d.mtx.Unlock()
routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}
ag, ok := routeGroups[fp]
if ok {
ag.insert(alert)
return
}
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)
// 处理pipeline并发送告警
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}
Pipeline的处理
在告警发送之前需要经过一系列的处理,这些处理称为Pipeline,由不同的Stage构成。
alertmanager的main()
函数会初始化一个PipelineBuilder,PipelineBuilder
实现了Stage
接口。
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
Stage
翻译过来就是"阶段",对应Pipeline中的各个阶段,如GossipSettle
、Wait
、Dedup
等。
下面两个函数定义了如何初始化PipelineBuilder
的各个Stage
。
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
times map[string][]timeinterval.TimeInterval,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
tas := NewTimeActiveStage(times)
tms := NewTimeMuteStage(times)
ss := NewMuteStage(silencer)
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tas, tms, ss, st}
}
return rs
}
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
) Stage {
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}
根据alertmanager的架构图,其中最重要的Stage为::WaitStage
、DedupStage
、RetryStage
、SetNotifiesStage
,即createReceiverStage
函数中创建的几个Stage。
之前有讲过,processAlert
函数会调用各个Stage的Exec()
方法来处理告警,处理的告警内容为本示例中非恢复状态的告警。
WaitStage
顾名思义,WaitStage
表示向其他实例发送Notification Log的时间间隔,只是单纯的时间等待。
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}
各个实例发送Notification Log的时长并不一样,它与p.Position()
的返回值有关,timeout
默认是15s。
// clusterWait returns a function that inspects the current peer state and returns
// a duration of one base timeout for each peer with a higher ID than ourselves.
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
return func() time.Duration {
return time.Duration(p.Position()) * timeout
}
}
DedupStage和RetryStage
DedupStage
目的就是根据告警的哈希值来判断本实例的告警是否已经被发送,如果已经被发送,则本实例不再继续发送。哈希算法如下,主要是对告警的标签进行哈希,后面再详细讲解该阶段。
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff'
hb := hashBuffers.Get().(*hashBuffer)
defer hashBuffers.Put(hb)
b := hb.buf[:0]
names := make(model.LabelNames, 0, len(a.Labels))
for ln := range a.Labels {
names = append(names, ln)
}
sort.Sort(names)
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}
hash := xxhash.Sum64(b)
return hash
}
RetryStage
的目的是将告警信息发送给各个用户配置的告警通道,如webhook、Email、wechat、slack等,如果设置了send_resolved: true
,则还会发送告警恢复消息,并支持在连接异常的情况下使用指数退避的方式下进行重传。
SetNotifiesStage
该阶段就是使用Notification Log
向其他节点发送告警通知的过程。这也是我们比较疑惑的阶段,既然同步了告警消息,为什么仍然会产生告警恢复?
下面是SetNotifiesStage
的处理函数:
func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, errors.New("firing alerts missing")
}
resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, errors.New("resolved alerts missing")
}
// 通知其他实例
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}
首先通过FiringAlerts
获取告警消息,通过ResolvedAlerts
获取告警恢复消息,然后通过n.nflog.Log
将这些消息发送给其他实例。可以看到FiringAlerts
和ResolvedAlerts
获取到的是[]uint64
类型的数据,这些数据实际内容是什么?
func FiringAlerts(ctx context.Context) ([]uint64, bool) {
v, ok := ctx.Value(keyFiringAlerts).([]uint64)
return v, ok
}
func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
v, ok := ctx.Value(keyResolvedAlerts).([]uint64)
return v, ok
}
答案是,SetNotifiesStage
中用到的FiringAlerts
和ResolvedAlerts
是在DedupStage
阶段生成的,因此SetNotifiesStage
阶段发送给其他实例的信息实际是告警的哈希值!
DedupStage
处理如下:
func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
...
firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
resolved := []uint64{}
var hash uint64
for _, a := range alerts {
hash = n.hash(a)
if a.Resolved() {
resolved = append(resolved, hash)
resolvedSet[hash] = struct{}{}
} else {
firing = append(firing, hash)
firingSet[hash] = struct{}{}
}
}
//生成SetNotifiesStage使用的 FiringAlerts
ctx = WithFiringAlerts(ctx, firing)
//生成SetNotifiesStage使用的 ResolvedAlerts
ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
default:
return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
}
在DedupStage
阶段中会使用n.nflog.Query
来接收其他实例SetNotifiesStage
发送的信息,其返回的entries
类型如下,从注释中可以看到FiringAlerts
和ResolvedAlerts
就是两个告警消息哈希数组:
type Entry struct {
// The key identifying the dispatching group.
GroupKey []byte `protobuf:"bytes,1,opt,name=group_key,json=groupKey,proto3" json:"group_key,omitempty"`
// The receiver that was notified.
Receiver *Receiver `protobuf:"bytes,2,opt,name=receiver,proto3" json:"receiver,omitempty"`
// Hash over the state of the group at notification time.
// Deprecated in favor of FiringAlerts field, but kept for compatibility.
GroupHash []byte `protobuf:"bytes,3,opt,name=group_hash,json=groupHash,proto3" json:"group_hash,omitempty"`
// Whether the notification was about a resolved alert.
// Deprecated in favor of ResolvedAlerts field, but kept for compatibility.
Resolved bool `protobuf:"varint,4,opt,name=resolved,proto3" json:"resolved,omitempty"`
// Timestamp of the succeeding notification.
Timestamp time.Time `protobuf:"bytes,5,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
// FiringAlerts list of hashes of firing alerts at the last notification time.
FiringAlerts []uint64 `protobuf:"varint,6,rep,packed,name=firing_alerts,json=firingAlerts,proto3" json:"firing_alerts,omitempty"`
// ResolvedAlerts list of hashes of resolved alerts at the last notification time.
ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
DedupStage
阶段会使用和SetNotifiesStage
阶段相同的哈希算法来计算本实例的告警的哈希值,然后与接收到的其他实例发送的告警哈希值进行对比,如果needsUpdate
返回true
,则会继续发送告警,如果返回false,则可以认为这部分告警已经被其他实例发送,本实例不再发送。
needsUpdate
的函数如下,入参entry
为接收到的其他实例发送的告警哈希值,firing
和resolved
为本实例所拥有的告警哈希值,可以看到,如果要让本地不发送告警恢复,则满足如下条件之一即可:
- 本实例的
firing
哈希是entry.FiringAlerts
的子集,即本实例的所有告警都已经被发送过 - 不启用发送告警恢复功能或本实例的
resolved
哈希是entry.ResolvedAlerts
的子集(即本实例的所有告警恢复都已经被发送过)
也就是说,如果本实例的告警哈希与接收到的告警哈希存在交叉或完全不相同的情况时,则不会对告警消息和告警恢复消息产生抑制效果。
同时从上面也得出:alertmanager HA实例之间并不会同步具体的告警消息,它们只传递了告警的哈希值,且仅仅用于抑制告警和告警恢复。
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0
}
if !entry.IsFiringSubset(firing) {
return true
}
// Notify about all alerts being resolved.
// This is done irrespective of the send_resolved flag to make sure that
// the firing alerts are cleared from the notification log.
if len(firing) == 0 {
// If the current alert group and last notification contain no firing
// alert, it means that some alerts have been fired and resolved during the
// last interval. In this case, there is no need to notify the receiver
// since it doesn't know about them.
return len(entry.FiringAlerts) > 0
}
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}
// Nothing changed, only notify if the repeat interval has passed.
return entry.Timestamp.Before(n.now().Add(-repeat))
}
总结
至此,问题的根因也就清楚了。假设如下场景,alertmanager-1
此时有2条firing的告警alert-1和alert-2,alertmanager-2
有2条firing的告警alert-1和alert-3,由于使用了LB,导致发送到alertmanager-2
的alert-1告警数目远少于alertmanager-1
,且alertmanager-2
的alert-1由于EndAt
时间过老,即将产生告警恢复。而这种情况下alertmanager-2
的firing告警哈希并不是alertmanager-1
发送过来的告警哈希的子集,因此并不会产生抑制效果,之后便会导致alertmanager-2
产生alert-1的告警恢复。
因此官方要求上游的告警必须能够发送到所有的alertmanager实例上。