Skip to content

Commit

Permalink
Use ctx everywhere & cache object only when the tx succeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 26, 2023
1 parent 886c27b commit 24b6dd0
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 131 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/teambition/rrule-go v1.8.2
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)

require (
Expand All @@ -30,7 +31,6 @@ require (
github.com/ssgreg/journald v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
7 changes: 4 additions & 3 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
Expand Down Expand Up @@ -33,15 +34,15 @@ func (i *IncidentRow) Upsert() interface{} {
// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExec(stmt, i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}
Expand Down
131 changes: 74 additions & 57 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/config"
Expand Down Expand Up @@ -74,14 +75,14 @@ func (i *Incident) HasManager() bool {
}

// ProcessEvent processes the given event for the current incident.
func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error {
func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event, created bool) error {
i.Lock()
defer i.Unlock()

i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags)
err := i.Object.UpdateMetadata(ctx, tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags)
if err != nil {
i.logger.Errorw("Can't update object metadata", zap.Error(err))

Expand All @@ -90,7 +91,7 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error

if ev.ID == 0 {
eventRow := event.NewEventRow(ev, i.Object.ID)
eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow)
eventID, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow)
if err != nil {
i.logger.Errorw("Failed to insert event and fetch its ID", zap.Error(err))

Expand All @@ -101,43 +102,54 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error
}

if created {
err := i.processIncidentOpenedEvent(tx, ev)
err := i.processIncidentOpenedEvent(ctx, tx, ev)
if err != nil {
return err
}

i.logger = i.logger.With(zap.String("incident", i.String()))
}

if err := i.AddEvent(tx, &ev); err != nil {
if err := i.AddEvent(ctx, tx, &ev); err != nil {
i.logger.Errorw("Can't insert incident event to the database", zap.Error(err))

return errors.New("can't insert incident event to the database")
}

if ev.Type == event.TypeAcknowledgement {
return i.processAcknowledgementEvent(tx, ev)
// The current request has been processed successfully, so update the in-memory objects cache.
i.Object.UpdateCache()

return i.processAcknowledgementEvent(ctx, tx, ev)
}

causedBy, err := i.processSeverityChangedEvent(tx, ev)
causedBy, err := i.processSeverityChangedEvent(ctx, tx, ev)
if err != nil {
return err
}

// Check if any (additional) rules match this object. Filters of rules that already have a state don't have
// to be checked again, these rules already matched and stay effective for the ongoing incident.
causedBy, err = i.evaluateRules(tx, ev.ID, causedBy)
causedBy, err = i.evaluateRules(ctx, tx, ev.ID, causedBy)
if err != nil {
return err
}

// Re-evaluate escalations based on the newly evaluated rules.
i.evaluateEscalations()

return i.notifyContacts(tx, &ev, causedBy)
err = i.notifyContacts(ctx, tx, &ev, causedBy)
if err != nil {
return err
}

// The current request has been processed successfully, so update the in-memory objects cache.
i.Object.UpdateCache()

return nil
}

func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) {
func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) (types.Int, error) {
oldIncidentSeverity := i.Severity()
oldSourceSeverity := i.SeverityBySource[ev.SourceId]
if oldSourceSeverity == event.SeverityNone {
Expand All @@ -163,14 +175,14 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
OldSeverity: oldSourceSeverity,
Message: utils.ToDBString(ev.Message),
}
causedByHistoryId, err := i.AddHistory(tx, history, true)
causedByHistoryId, err := i.AddHistory(ctx, tx, history, true)
if err != nil {
i.logger.Errorw("Can't insert source severity changed history to the database", zap.Error(err))

return types.Int{}, errors.New("can't insert source severity changed history to the database")
}

if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil {
if err = i.AddSourceSeverity(ctx, tx, ev.Severity, ev.SourceId); err != nil {
i.logger.Errorw("Failed to upsert source severity", zap.Error(err))

return types.Int{}, errors.New("failed to upsert source severity")
Expand All @@ -186,7 +198,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
"Incident severity changed from %s to %s", oldIncidentSeverity.String(), newIncidentSeverity.String(),
)

if err = i.Sync(tx); err != nil {
if err = i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Failed to update incident severity", zap.Error(err))

return types.Int{}, errors.New("failed to update incident severity")
Expand All @@ -200,7 +212,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
OldSeverity: oldIncidentSeverity,
CausedByIncidentHistoryID: causedByHistoryId,
}
if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil {
if causedByHistoryId, err = i.AddHistory(ctx, tx, history, true); err != nil {
i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err))

return types.Int{}, errors.New("failed to insert incident severity changed history")
Expand All @@ -214,7 +226,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
RemoveCurrent(i.Object)

incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)}
_, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
_, err = tx.NamedExecContext(ctx, `UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
if err != nil {
i.logger.Errorw("Failed to close incident", zap.Error(err))

Expand All @@ -227,7 +239,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
Type: Closed,
}

_, err = i.AddHistory(tx, history, false)
_, err = i.AddHistory(ctx, tx, history, false)
if err != nil {
i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err))

Expand All @@ -238,9 +250,9 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
return causedByHistoryId, nil
}

func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error {
func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error {
i.StartedAt = ev.Time
if err := i.Sync(tx); err != nil {
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))

return errors.New("can't insert incident to the database")
Expand All @@ -253,7 +265,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error
EventID: utils.ToDBInt(ev.ID),
}

if _, err := i.AddHistory(tx, historyRow, false); err != nil {
if _, err := i.AddHistory(ctx, tx, historyRow, false); err != nil {
i.logger.Errorw("Can't insert incident opened history event", zap.Error(err))

return errors.New("can't insert incident opened history event")
Expand All @@ -265,7 +277,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error
// evaluateRules evaluates all the configured rules for this *incident.Object and
// generates history entries for each matched rule.
// Returns error on database failure.
func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) {
func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) {
if i.Rules == nil {
i.Rules = make(map[int64]struct{})
}
Expand All @@ -290,7 +302,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int)
i.Rules[r.ID] = struct{}{}
i.logger.Infof("Rule %q matches", r.Name)

err := i.AddRuleMatched(tx, r)
err := i.AddRuleMatched(ctx, tx, r)
if err != nil {
i.logger.Errorw("Failed to upsert incident rule", zap.String("rule", r.Name), zap.Error(err))

Expand All @@ -304,7 +316,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int)
Type: RuleMatched,
CausedByIncidentHistoryID: causedBy,
}
insertedID, err := i.AddHistory(tx, history, true)
insertedID, err := i.AddHistory(ctx, tx, history, true)
if err != nil {
i.logger.Errorw("Failed to insert rule matched incident history", zap.String("rule", r.Name), zap.Error(err))

Expand Down Expand Up @@ -370,7 +382,7 @@ func (i *Incident) evaluateEscalations() {
// notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered
// as well as builds the incident recipients along with their channel types and sends notifications based on that.
// Returns error on database failure.
func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error {
func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error {
managed := i.HasManager()

contactChannels := make(map[*recipient.Contact]map[string]struct{})
Expand All @@ -393,7 +405,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
r := i.runtimeConfig.Rules[escalation.RuleID]
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())

err := i.AddEscalationTriggered(tx, state)
err := i.AddEscalationTriggered(ctx, tx, state)
if err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
Expand All @@ -411,7 +423,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
Type: EscalationTriggered,
CausedByIncidentHistoryID: causedBy,
}
causedBy, err = i.AddHistory(tx, history, true)
causedBy, err = i.AddHistory(ctx, tx, history, true)
if err != nil {
i.logger.Errorw(
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
Expand All @@ -421,7 +433,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
return errors.New("failed to insert escalation triggered incident history")
}

err = i.AddRecipient(tx, escalation, ev.ID)
err = i.AddRecipient(ctx, tx, escalation, ev.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -476,40 +488,45 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
CausedByIncidentHistoryID: causedBy,
}

for chType := range channels {
i.logger.Infof("Notify contact %q via %q", contact.FullName, chType)
select {
case <-ctx.Done():
return ctx.Err()
default:
for chType := range channels {
i.logger.Infof("Notify contact %q via %q", contact.FullName, chType)

hr.ChannelType = utils.ToDBString(chType)
hr.ChannelType = utils.ToDBString(chType)

_, err := i.AddHistory(tx, hr, false)
if err != nil {
i.logger.Errorw(
"Failed to insert contact notified incident history", zap.String("contact", contact.String()),
zap.Error(err),
)
}
_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
i.logger.Errorw(
"Failed to insert contact notified incident history", zap.String("contact", contact.String()),
zap.Error(err),
)
}

chConf := i.runtimeConfig.Channels[chType]
if chConf == nil {
i.logger.Errorw("Could not find config for channel", zap.String("type", chType))
continue
}
chConf := i.runtimeConfig.Channels[chType]
if chConf == nil {
i.logger.Errorw("Could not find config for channel", zap.String("type", chType))
continue
}

plugin, err := chConf.GetPlugin()
if err != nil {
i.logger.Errorw("Could not initialize channel", zap.String("type", chType), zap.Error(err))
continue
}
plugin, err := chConf.GetPlugin()
if err != nil {
i.logger.Errorw("Could not initialize channel", zap.String("type", chType), zap.Error(err))
continue
}

err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL)
if err != nil {
i.logger.Errorw("Failed to send via channel", zap.String("type", chType), zap.Error(err))
continue
}
err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL)
if err != nil {
i.logger.Errorw("Failed to send via channel", zap.String("type", chType), zap.Error(err))
continue
}

i.logger.Infow(
"Successfully sent a message via channel", zap.String("type", chType), zap.String("contact", contact.String()),
)
i.logger.Infow(
"Successfully sent a message via channel", zap.String("type", chType), zap.String("contact", contact.String()),
)
}
}
}

Expand All @@ -519,7 +536,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
// processAcknowledgementEvent processes the given ack event.
// Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry.
// Returns error on database failure.
func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error {
func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error {
contact := i.runtimeConfig.GetContact(ev.Username)
if contact == nil {
i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username))
Expand Down Expand Up @@ -554,7 +571,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(tx, hr, false)
_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
i.logger.Errorw(
"Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err),
Expand All @@ -566,7 +583,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro
cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err = tx.NamedExec(stmt, cr)
_, err = tx.NamedExecContext(ctx, stmt, cr)
if err != nil {
i.logger.Errorw(
"Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err),
Expand Down
Loading

0 comments on commit 24b6dd0

Please sign in to comment.