Skip to content

Commit

Permalink
Use ctx everywhere & cache object only when the tx succeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 25, 2023
1 parent cb15ebe commit f44c4b6
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 141 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/teambition/rrule-go v1.8.2
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)

require (
Expand All @@ -30,7 +31,6 @@ require (
github.com/ssgreg/journald v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
7 changes: 4 additions & 3 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
Expand Down Expand Up @@ -33,15 +34,15 @@ func (i *IncidentRow) Upsert() interface{} {
// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExec(stmt, i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}
Expand Down
141 changes: 81 additions & 60 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/config"
Expand Down Expand Up @@ -75,14 +76,14 @@ func (i *Incident) HasManager() bool {
}

// ProcessEvent processes the given event for the current incident.
func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error {
func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event, created bool) error {
i.Lock()
defer i.Unlock()

i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags)
err := i.Object.UpdateMetadata(ctx, tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags)
if err != nil {
i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err))

Expand All @@ -91,7 +92,7 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error

if ev.ID == 0 {
eventRow := event.NewEventRow(ev, i.Object.ID)
eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow)
eventID, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow)
if err != nil {
i.logger.Errorw(
"failed insert event and fetch its ID", zap.String("object", i.ObjectDisplayName()),
Expand All @@ -105,13 +106,13 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error
}

if created {
err := i.processIncidentOpenedEvent(tx, ev)
err := i.processIncidentOpenedEvent(ctx, tx, ev)
if err != nil {
return err
}
}

if err := i.AddEvent(tx, &ev); err != nil {
if err := i.AddEvent(ctx, tx, &ev); err != nil {
i.logger.Errorw(
"can't insert incident event to the database", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -121,28 +122,39 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error
}

if ev.Type == event.TypeAcknowledgement {
return i.processAcknowledgementEvent(tx, ev)
// The current request has been processed successfully, so update the in-memory objects cache.
i.Object.UpdateCache()

return i.processAcknowledgementEvent(ctx, tx, ev)
}

causedBy, err := i.processSeverityChangedEvent(tx, ev)
causedBy, err := i.processSeverityChangedEvent(ctx, tx, ev)
if err != nil {
return err
}

// Check if any (additional) rules match this object. Filters of rules that already have a state don't have
// to be checked again, these rules already matched and stay effective for the ongoing incident.
causedBy, err = i.evaluateRules(tx, ev.ID, causedBy)
causedBy, err = i.evaluateRules(ctx, tx, ev.ID, causedBy)
if err != nil {
return err
}

// Re-evaluate escalations based on the newly evaluated rules.
i.evaluateEscalations()

return i.notifyContacts(tx, &ev, causedBy)
err = i.notifyContacts(ctx, tx, &ev, causedBy)
if err != nil {
return err
}

// The current request has been processed successfully, so update the in-memory objects cache.
i.Object.UpdateCache()

return nil
}

func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) {
func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) (types.Int, error) {
oldIncidentSeverity := i.Severity()
oldSourceSeverity := i.SeverityBySource[ev.SourceId]
if oldSourceSeverity == event.SeverityNone {
Expand All @@ -169,7 +181,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
OldSeverity: oldSourceSeverity,
Message: utils.ToDBString(ev.Message),
}
causedByHistoryId, err := i.AddHistory(tx, history, true)
causedByHistoryId, err := i.AddHistory(ctx, tx, history, true)
if err != nil {
i.logger.Errorw(
"can't insert source severity changed history to the database", zap.String("object", i.ObjectDisplayName()),
Expand All @@ -179,7 +191,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
return types.Int{}, errors.New("can't insert source severity changed history to the database")
}

if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil {
if err = i.AddSourceSeverity(ctx, tx, ev.Severity, ev.SourceId); err != nil {
i.logger.Errorw(
"failed to upsert source severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
zap.Error(err),
Expand All @@ -199,7 +211,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
)

if err = i.Sync(tx); err != nil {
if err = i.Sync(ctx, tx); err != nil {
i.logger.Errorw(
"failed to update incident severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
zap.Error(err),
Expand All @@ -216,7 +228,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
OldSeverity: oldIncidentSeverity,
CausedByIncidentHistoryID: causedByHistoryId,
}
if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil {
if causedByHistoryId, err = i.AddHistory(ctx, tx, history, true); err != nil {
i.logger.Errorw(
"failed to insert incident severity changed history", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -233,7 +245,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
RemoveCurrent(i.Object)

incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)}
_, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
_, err = tx.NamedExecContext(ctx, `UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
if err != nil {
i.logger.Errorw(
"failed to close incident", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
Expand All @@ -249,7 +261,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
Type: Closed,
}

_, err = i.AddHistory(tx, history, false)
_, err = i.AddHistory(ctx, tx, history, false)
if err != nil {
i.logger.Errorw(
"can't insert incident closed history to the database", zap.String("object", i.ObjectDisplayName()),
Expand All @@ -263,9 +275,9 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ
return causedByHistoryId, nil
}

func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error {
func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error {
i.StartedAt = ev.Time
if err := i.Sync(tx); err != nil {
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw(
"can't insert incident to the database", zap.String("incident", i.String()),
zap.String("object", i.ObjectDisplayName()), zap.Error(err),
Expand All @@ -281,7 +293,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error
EventID: utils.ToDBInt(ev.ID),
}

if _, err := i.AddHistory(tx, historyRow, false); err != nil {
if _, err := i.AddHistory(ctx, tx, historyRow, false); err != nil {
i.logger.Errorw(
"can't insert incident opened history event", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -296,7 +308,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error
// evaluateRules evaluates all the configured rules for this *incident.Object and
// generates history entries for each matched rule.
// Returns error on database failure.
func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) {
func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) {
if i.Rules == nil {
i.Rules = make(map[int64]struct{})
}
Expand Down Expand Up @@ -333,7 +345,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int)
Type: RuleMatched,
CausedByIncidentHistoryID: causedBy,
}
insertedID, err := i.AddRuleMatchedHistory(tx, r, history)
insertedID, err := i.AddRuleMatchedHistory(ctx, tx, r, history)
if err != nil {
i.logger.Errorw(
"failed to add incident rule matched history", zap.String("rule", r.Name), zap.String("object", i.ObjectDisplayName()),
Expand Down Expand Up @@ -403,7 +415,7 @@ func (i *Incident) evaluateEscalations() {
// notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered
// as well as builds the incident recipients along with their channel types and sends notifications based on that.
// Returns error on database failure.
func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error {
func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error {
managed := i.HasManager()

contactChannels := make(map[*recipient.Contact]map[string]struct{})
Expand Down Expand Up @@ -438,7 +450,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
CausedByIncidentHistoryID: causedBy,
}

causedByHistoryId, err := i.AddEscalationTriggered(tx, state, history)
causedByHistoryId, err := i.AddEscalationTriggered(ctx, tx, state, history)
if err != nil {
i.logger.Errorw(
"failed to add escalation triggered history", zap.String("incident", i.String()),
Expand All @@ -451,7 +463,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I

causedBy = causedByHistoryId

err = i.AddRecipient(tx, escalation, ev.ID)
err = i.AddRecipient(ctx, tx, escalation, ev.ID)
if err != nil {
i.logger.Errorw(
"failed to add incident recipients", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
Expand Down Expand Up @@ -512,44 +524,53 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
CausedByIncidentHistoryID: causedBy,
}

for chType := range channels {
i.logger.Infow(
fmt.Sprintf("Notify contact %q via %q", contact.FullName, chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()),
)
select {
case <-ctx.Done():
return ctx.Err()
default:
for chType := range channels {
i.logger.Infow(
fmt.Sprintf("Notify contact %q via %q", contact.FullName, chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()),
)

hr.ChannelType = utils.ToDBString(chType)
hr.ChannelType = utils.ToDBString(chType)

_, err := i.AddHistory(tx, hr, false)
if err != nil {
i.logger.Errorln(err)
}
_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
i.logger.Errorw(
"failed to insert contact notified incident history", zap.String("type", chType),
zap.String("contact", contact.String()), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()),
)
}

chConf := i.runtimeConfig.Channels[chType]
if chConf == nil {
i.logger.Errorw(
"could not find config for channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()),
)
continue
}
chConf := i.runtimeConfig.Channels[chType]
if chConf == nil {
i.logger.Errorw(
"could not find config for channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()),
)
continue
}

plugin, err := chConf.GetPlugin()
if err != nil {
i.logger.Errorw(
"couldn't initialize channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)
continue
}
plugin, err := chConf.GetPlugin()
if err != nil {
i.logger.Errorw(
"couldn't initialize channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)
continue
}

err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL)
if err != nil {
i.logger.Errorw(
"failed to send via channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)
continue
err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL)
if err != nil {
i.logger.Errorw(
"failed to send via channel", zap.String("type", chType), zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)
continue
}
}
}
}
Expand All @@ -560,7 +581,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I
// processAcknowledgementEvent processes the given ack event.
// Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry.
// Returns error on database failure.
func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error {
func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error {
contact := i.runtimeConfig.GetContact(ev.Username)
if contact == nil {
i.logger.Warnw(
Expand Down Expand Up @@ -601,7 +622,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(tx, hr, false)
_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
i.logger.Errorw(
"failed to add recipient role changed history", zap.String("recipient", contact.String()),
Expand All @@ -614,7 +635,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro
cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err = tx.NamedExec(stmt, cr)
_, err = tx.NamedExecContext(ctx, stmt, cr)
if err != nil {
i.logger.Errorw(
"failed to upsert incident contact", zap.String("contact", contact.String()), zap.String("object", i.ObjectDisplayName()),
Expand Down
Loading

0 comments on commit f44c4b6

Please sign in to comment.