Skip to content

Commit

Permalink
Trigger time based escalations
Browse files Browse the repository at this point in the history
Co-authored-by: Yonas Habteab <[email protected]>
  • Loading branch information
julianbrost and yhabteab committed Nov 14, 2023
1 parent e78e6ef commit 3cd9e31
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 72 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/icinga/icinga-notifications

go 1.20
go 1.21

require (
github.com/creasty/defaults v1.7.0
Expand Down
198 changes: 155 additions & 43 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ type Incident struct {

incidentRowID int64

// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
//
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
// is less than an hour old, timer will fire 1h after incident start, if the incident is between 1h and 2h
// old, timer will fire after 2h, and if the incident is already older than 2h, no future escalations can
// be reached solely based on the incident aging, so no more timer is necessary and timer stores nil.
timer *time.Timer

db *icingadb.DB
logger *zap.SugaredLogger
runtimeConfig *config.RuntimeConfig
Expand Down Expand Up @@ -147,7 +155,12 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo
}

// Re-evaluate escalations based on the newly evaluated rules.
if _, err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil {
escalations, err := i.evaluateEscalations(ev.Time)
if err != nil {
return err
}

if err := i.triggerEscalations(ctx, tx, ev, causedBy, escalations); err != nil {
return err
}

Expand All @@ -165,6 +178,68 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo
return i.notifyContacts(ctx, ev, notifications)
}

// RetriggerEscalations tries to re-evaluate the escalations and notify contacts.
func (i *Incident) RetriggerEscalations(ev *event.Event) {
i.Lock()
defer i.Unlock()

i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

if !i.RecoveredAt.IsZero() {
// Incident is recovered in the meantime.
return
}

if !time.Now().After(ev.Time) {
i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev))
return
}

escalations, err := i.evaluateEscalations(ev.Time)
if err != nil {
i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err))
return
}

if len(escalations) == 0 {
i.logger.Debug("Reevaluated escalations, no new escalations triggered")
return
}

var notifications []*NotificationEntry
ctx := context.Background()
err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error {
err := ev.Sync(ctx, tx, i.db, i.Object.ID)
if err != nil {
return err
}

if err = i.triggerEscalations(ctx, tx, ev, types.Int{}, escalations); err != nil {
return err
}

channels := make(contactChannels)
for _, escalation := range escalations {
channels.loadEscalationRecipientsChannel(escalation, i, ev.Time)
}

notifications, err = i.addPendingNotifications(ctx, tx, ev, channels, types.Int{})

return err
})
if err != nil {
i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err))
} else {
if err = i.notifyContacts(ctx, ev, notifications); err != nil {
i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err))
return
}

i.logger.Info("Successfully reevaluated time-based escalations")
}
}

func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) (types.Int, error) {
var causedByHistoryId types.Int
oldSeverity := i.Severity
Expand Down Expand Up @@ -214,6 +289,10 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,

return types.Int{}, errors.New("can't insert incident closed history to the database")
}

if i.timer != nil {
i.timer.Stop()
}
}

i.Severity = newSeverity
Expand Down Expand Up @@ -312,14 +391,25 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
return causedBy, nil
}

// evaluateEscalations evaluates this incidents rule escalations if they aren't already.
// Returns whether a new escalation triggered or an error on database failure.
func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) (bool, error) {
// evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already.
// Returns the newly evaluated escalations to be triggered or an error on database failure.
func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) {
if i.EscalationState == nil {
i.EscalationState = make(map[int64]*EscalationState)
}

newEscalationMatched := false
// Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations,
// this function will start a new timer.
if i.timer != nil {
i.logger.Info("Stopping reevaluate timer due to escalation evaluation")
i.timer.Stop()
i.timer = nil
}

filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity}

var escalations []*rule.Escalation
retryAfter := rule.RetryNever

for rID := range i.Rules {
r := i.runtimeConfig.Rules[rID]
Expand All @@ -336,67 +426,89 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve
if escalation.Condition == nil {
matched = true
} else {
cond := &rule.EscalationFilter{
IncidentAge: time.Since(i.StartedAt),
IncidentSeverity: i.Severity,
}

var err error
matched, err = escalation.Condition.Eval(cond)
matched, err = escalation.Condition.Eval(filterContext)
if err != nil {
i.logger.Warnw(
"Failed to evaluate escalation condition", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)

matched = false
} else if !matched {
incidentAgeFilter := filterContext.ExtractRetryAfter(escalation.Condition)
retryAfter = min(retryAfter, incidentAgeFilter)
}
}

if matched {
newEscalationMatched = true
escalations = append(escalations, escalation)
}
}
}
}

state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
i.EscalationState[escalation.ID] = state
if retryAfter != rule.RetryNever {
nextEvalAt := i.StartedAt.Add(retryAfter)

i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt))
i.timer = time.AfterFunc(time.Until(nextEvalAt), func() {
i.logger.Info("Reevaluating escalations")

if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)
i.RetriggerEscalations(&event.Event{
Type: event.TypeInternal,
Time: nextEvalAt,
Message: fmt.Sprintf("Incident reached age %v", retryAfter),
})
})
}

return false, errors.New("failed to upsert escalation state")
}
return escalations, nil
}

history := &HistoryRow{
Time: state.TriggeredAt,
EventID: utils.ToDBInt(ev.ID),
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
RuleID: utils.ToDBInt(r.ID),
Type: EscalationTriggered,
CausedByIncidentHistoryID: causedBy,
}
// triggerEscalations triggers the given escalations and generates incident history items for each of them.
// Returns an error on database failure.
func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalations []*rule.Escalation) error {
for _, escalation := range escalations {
r := i.runtimeConfig.Rules[escalation.RuleID]
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())

if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
i.logger.Errorw(
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)
state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
i.EscalationState[escalation.ID] = state

return false, errors.New("failed to insert escalation triggered incident history")
}
if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)

if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
return false, err
}
}
}
return errors.New("failed to upsert escalation state")
}

history := &HistoryRow{
Time: state.TriggeredAt,
EventID: utils.ToDBInt(ev.ID),
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
RuleID: utils.ToDBInt(r.ID),
Type: EscalationTriggered,
CausedByIncidentHistoryID: causedBy,
}

if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
i.logger.Errorw(
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)

return errors.New("failed to insert escalation triggered incident history")
}

if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
return err
}
}

return newEscalationMatched, nil
return nil
}

// notifyContacts executes all the given pending notifications of the current incident.
Expand Down
33 changes: 5 additions & 28 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,11 @@ func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Log
continue
}

evaluateRulesAndEscalations := func(ctx context.Context) error {
ev := &event.Event{Time: time.Now(), Type: event.TypeEscalation}
if !incident.evaluateEscalations() {
return nil
}

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()

err = incident.notifyContacts(ctx, tx, ev, types.Int{})
if err != nil {
return err
}

if err = tx.Commit(); err != nil {
incident.logger.Errorw("Failed to commit database transaction", zap.Error(err))
return err
}

return nil
}

if evaluateRulesAndEscalations(ctx) != nil {
continue
}
incident.RetriggerEscalations(&event.Event{
Time: time.Now(),
Type: event.TypeInternal,
Message: "Incident reevaluation at daemon startup",
})
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str
)
}

// RunInTx allows running a function in a database transaction without requiring manual transaction handling.
//
// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is
// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an
// error is returned if a database operation fails.
func RunInTx(ctx context.Context, db *icingadb.DB, fn func(tx *sqlx.Tx) error) error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()

err = fn(tx)
if err != nil {
return err
}

return tx.Commit()
}

// InsertAndFetchId executes the given query and fetches the last inserted ID.
func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) {
var lastInsertId int64
Expand Down

0 comments on commit 3cd9e31

Please sign in to comment.