diff --git a/go.mod b/go.mod index 8d2f9b481..fd2e50686 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/icinga/icinga-notifications -go 1.20 +go 1.21 require ( github.com/creasty/defaults v1.7.0 diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 661e1ff76..ef92c8b74 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -35,6 +35,14 @@ type Incident struct { incidentRowID int64 + // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. + // + // For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident + // is less than an hour old, timer will fire 1h after incident start, if the incident is between 1h and 2h + // old, timer will fire after 2h, and if the incident is already older than 2h, no future escalations can + // be reached solely based on the incident aging, so no more timer is necessary and timer stores nil. + timer *time.Timer + db *icingadb.DB logger *zap.SugaredLogger runtimeConfig *config.RuntimeConfig @@ -147,7 +155,12 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo } // Re-evaluate escalations based on the newly evaluated rules. - if _, err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil { + escalations, err := i.evaluateEscalations(ev.Time) + if err != nil { + return err + } + + if err := i.triggerEscalations(ctx, tx, ev, causedBy, escalations); err != nil { return err } @@ -165,6 +178,68 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo return i.notifyContacts(ctx, ev, notifications) } +// RetriggerEscalations tries to re-evaluate the escalations and notify contacts. +func (i *Incident) RetriggerEscalations(ev *event.Event) { + i.Lock() + defer i.Unlock() + + i.runtimeConfig.RLock() + defer i.runtimeConfig.RUnlock() + + if !i.RecoveredAt.IsZero() { + // Incident is recovered in the meantime. + return + } + + if !time.Now().After(ev.Time) { + i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) + return + } + + escalations, err := i.evaluateEscalations(ev.Time) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + return + } + + if len(escalations) == 0 { + i.logger.Debug("Reevaluated escalations, no new escalations triggered") + return + } + + var notifications []*NotificationEntry + ctx := context.Background() + err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.db, i.Object.ID) + if err != nil { + return err + } + + if err = i.triggerEscalations(ctx, tx, ev, types.Int{}, escalations); err != nil { + return err + } + + channels := make(contactChannels) + for _, escalation := range escalations { + channels.loadEscalationRecipientsChannel(escalation, i, ev.Time) + } + + notifications, err = i.addPendingNotifications(ctx, tx, ev, channels, types.Int{}) + + return err + }) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + } else { + if err = i.notifyContacts(ctx, ev, notifications); err != nil { + i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) + return + } + + i.logger.Info("Successfully reevaluated time-based escalations") + } +} + func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) (types.Int, error) { var causedByHistoryId types.Int oldSeverity := i.Severity @@ -214,6 +289,10 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, return types.Int{}, errors.New("can't insert incident closed history to the database") } + + if i.timer != nil { + i.timer.Stop() + } } i.Severity = newSeverity @@ -312,14 +391,25 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 return causedBy, nil } -// evaluateEscalations evaluates this incidents rule escalations if they aren't already. -// Returns whether a new escalation triggered or an error on database failure. -func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) (bool, error) { +// evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. +// Returns the newly evaluated escalations to be triggered or an error on database failure. +func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } - newEscalationMatched := false + // Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations, + // this function will start a new timer. + if i.timer != nil { + i.logger.Info("Stopping reevaluate timer due to escalation evaluation") + i.timer.Stop() + i.timer = nil + } + + filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity} + + var escalations []*rule.Escalation + retryAfter := rule.RetryNever for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] @@ -336,13 +426,8 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve if escalation.Condition == nil { matched = true } else { - cond := &rule.EscalationFilter{ - IncidentAge: time.Since(i.StartedAt), - IncidentSeverity: i.Severity, - } - var err error - matched, err = escalation.Condition.Eval(cond) + matched, err = escalation.Condition.Eval(filterContext) if err != nil { i.logger.Warnw( "Failed to evaluate escalation condition", zap.String("rule", r.Name), @@ -350,53 +435,80 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve ) matched = false + } else if !matched { + incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition) + retryAfter = min(retryAfter, incidentAgeFilter) } } if matched { - newEscalationMatched = true + escalations = append(escalations, escalation) + } + } + } + } - state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} - i.EscalationState[escalation.ID] = state + if retryAfter != rule.RetryNever { + nextEvalAt := eventTime.Add(retryAfter) - i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) + i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.timer = time.AfterFunc(retryAfter, func() { + i.logger.Info("Reevaluating escalations") - if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { - i.logger.Errorw( - "Failed to upsert escalation state", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) + i.RetriggerEscalations(&event.Event{ + Type: event.TypeInternal, + Time: nextEvalAt, + Message: fmt.Sprintf("Incident reached age %v", retryAfter), + }) + }) + } - return false, errors.New("failed to upsert escalation state") - } + return escalations, nil +} - history := &HistoryRow{ - Time: state.TriggeredAt, - EventID: utils.ToDBInt(ev.ID), - RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), - RuleID: utils.ToDBInt(r.ID), - Type: EscalationTriggered, - CausedByIncidentHistoryID: causedBy, - } +// triggerEscalations triggers the given escalations and generates incident history items for each of them. +// Returns an error on database failure. +func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalations []*rule.Escalation) error { + for _, escalation := range escalations { + r := i.runtimeConfig.Rules[escalation.RuleID] + i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) - if _, err := i.AddHistory(ctx, tx, history, false); err != nil { - i.logger.Errorw( - "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) + state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} + i.EscalationState[escalation.ID] = state - return false, errors.New("failed to insert escalation triggered incident history") - } + if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { + i.logger.Errorw( + "Failed to upsert escalation state", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) - if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { - return false, err - } - } - } + return errors.New("failed to upsert escalation state") + } + + history := &HistoryRow{ + Time: state.TriggeredAt, + EventID: utils.ToDBInt(ev.ID), + RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), + RuleID: utils.ToDBInt(r.ID), + Type: EscalationTriggered, + CausedByIncidentHistoryID: causedBy, + } + + if _, err := i.AddHistory(ctx, tx, history, false); err != nil { + i.logger.Errorw( + "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) + + return errors.New("failed to insert escalation triggered incident history") + } + + if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { + return err } } - return newEscalationMatched, nil + return nil } // notifyContacts executes all the given pending notifications of the current incident. diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 640ef5a21..56d0b1663 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -45,34 +45,11 @@ func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Log continue } - evaluateRulesAndEscalations := func(ctx context.Context) error { - ev := &event.Event{Time: time.Now(), Type: event.TypeEscalation} - if !incident.evaluateEscalations() { - return nil - } - - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - defer func() { _ = tx.Rollback() }() - - err = incident.notifyContacts(ctx, tx, ev, types.Int{}) - if err != nil { - return err - } - - if err = tx.Commit(); err != nil { - incident.logger.Errorw("Failed to commit database transaction", zap.Error(err)) - return err - } - - return nil - } - - if evaluateRulesAndEscalations(ctx) != nil { - continue - } + incident.RetriggerEscalations(&event.Event{ + Time: time.Now(), + Type: event.TypeInternal, + Message: "Incident reevaluation at daemon startup", + }) } return nil diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 7bf6b53b0..a809bbe3b 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -30,6 +30,26 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str ) } +// RunInTx allows running a function in a database transaction without requiring manual transaction handling. +// +// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is +// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an +// error is returned if a database operation fails. +func RunInTx(ctx context.Context, db *icingadb.DB, fn func(tx *sqlx.Tx) error) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + err = fn(tx) + if err != nil { + return err + } + + return tx.Commit() +} + // InsertAndFetchId executes the given query and fetches the last inserted ID. func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64