diff --git a/go.mod b/go.mod index 053eb0562..2b22cc1d0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/teambition/rrule-go v1.8.2 go.uber.org/zap v1.23.0 golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) require ( @@ -30,7 +31,6 @@ require ( github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.1.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 4ef2bdaea..8749a1ef0 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -1,6 +1,7 @@ package incident import ( + "context" "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" @@ -33,15 +34,15 @@ func (i *IncidentRow) Upsert() interface{} { // Sync synchronizes incidents to the database. // Fetches the last inserted incident id and modifies this incident's id. // Returns an error on database failure. -func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { +func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { if upsert { stmt, _ := db.BuildUpsertStmt(i) - _, err := tx.NamedExec(stmt, i) + _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %s", err) } } else { - incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i) + incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i) if err != nil { return err } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 3339ae8b4..ec47f1a72 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -1,6 +1,7 @@ package incident import ( + "context" "errors" "fmt" "github.com/icinga/icinga-notifications/internal/config" @@ -74,14 +75,14 @@ func (i *Incident) HasManager() bool { } // ProcessEvent processes the given event for the current incident. -func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error { +func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event, created bool) error { i.Lock() defer i.Unlock() i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() - err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + err := i.Object.UpdateMetadata(ctx, tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) if err != nil { i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err)) @@ -90,7 +91,7 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error if ev.ID == 0 { eventRow := event.NewEventRow(ev, i.Object.ID) - eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) + eventID, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) if err != nil { i.logger.Errorw( "failed insert event and fetch its ID", zap.String("object", i.ObjectDisplayName()), @@ -104,7 +105,7 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error } if created { - err := i.processIncidentOpenedEvent(tx, ev) + err := i.processIncidentOpenedEvent(ctx, tx, ev) if err != nil { return err } @@ -112,24 +113,27 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error i.logger = i.logger.With(zap.String("incident", i.String())) } - if err := i.AddEvent(tx, &ev); err != nil { + if err := i.AddEvent(ctx, tx, &ev); err != nil { i.logger.Errorw("Can't insert incident event to the database", zap.Error(err)) return errors.New("can't insert incident event to the database") } if ev.Type == event.TypeAcknowledgement { - return i.processAcknowledgementEvent(tx, ev) + // The current request has been processed successfully, so update the in-memory objects cache. + i.Object.UpdateCache() + + return i.processAcknowledgementEvent(ctx, tx, ev) } - causedBy, err := i.processSeverityChangedEvent(tx, ev) + causedBy, err := i.processSeverityChangedEvent(ctx, tx, ev) if err != nil { return err } // Check if any (additional) rules match this object. Filters of rules that already have a state don't have // to be checked again, these rules already matched and stay effective for the ongoing incident. - causedBy, err = i.evaluateRules(tx, ev.ID, causedBy) + causedBy, err = i.evaluateRules(ctx, tx, ev.ID, causedBy) if err != nil { return err } @@ -137,10 +141,18 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations() - return i.notifyContacts(tx, &ev, causedBy) + err = i.notifyContacts(ctx, tx, &ev, causedBy) + if err != nil { + return err + } + + // The current request has been processed successfully, so update the in-memory objects cache. + i.Object.UpdateCache() + + return nil } -func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) { +func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) (types.Int, error) { oldIncidentSeverity := i.Severity() oldSourceSeverity := i.SeverityBySource[ev.SourceId] if oldSourceSeverity == event.SeverityNone { @@ -166,14 +178,14 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ OldSeverity: oldSourceSeverity, Message: utils.ToDBString(ev.Message), } - causedByHistoryId, err := i.AddHistory(tx, history, true) + causedByHistoryId, err := i.AddHistory(ctx, tx, history, true) if err != nil { i.logger.Errorw("Can't insert source severity changed history to the database", zap.Error(err)) return types.Int{}, errors.New("can't insert source severity changed history to the database") } - if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil { + if err = i.AddSourceSeverity(ctx, tx, ev.Severity, ev.SourceId); err != nil { i.logger.Errorw("Failed to upsert source severity", zap.Error(err)) return types.Int{}, errors.New("failed to upsert source severity") @@ -189,7 +201,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ "Incident severity changed from %s to %s", oldIncidentSeverity.String(), newIncidentSeverity.String(), ) - if err = i.Sync(tx); err != nil { + if err = i.Sync(ctx, tx); err != nil { i.logger.Errorw("Failed to update incident severity", zap.Error(err)) return types.Int{}, errors.New("failed to update incident severity") @@ -203,7 +215,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ OldSeverity: oldIncidentSeverity, CausedByIncidentHistoryID: causedByHistoryId, } - if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil { + if causedByHistoryId, err = i.AddHistory(ctx, tx, history, true); err != nil { i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return types.Int{}, errors.New("failed to insert incident severity changed history") @@ -217,7 +229,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ RemoveCurrent(i.Object) incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)} - _, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) + _, err = tx.NamedExecContext(ctx, `UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) if err != nil { i.logger.Errorw("Failed to close incident", zap.Error(err)) @@ -230,7 +242,7 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ Type: Closed, } - _, err = i.AddHistory(tx, history, false) + _, err = i.AddHistory(ctx, tx, history, false) if err != nil { i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err)) @@ -241,9 +253,9 @@ func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (typ return causedByHistoryId, nil } -func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error { +func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error { i.StartedAt = ev.Time - if err := i.Sync(tx); err != nil { + if err := i.Sync(ctx, tx); err != nil { i.logger.Errorw("Can't insert incident to the database", zap.Error(err)) return errors.New("can't insert incident to the database") @@ -256,7 +268,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error EventID: utils.ToDBInt(ev.ID), } - if _, err := i.AddHistory(tx, historyRow, false); err != nil { + if _, err := i.AddHistory(ctx, tx, historyRow, false); err != nil { i.logger.Errorw("Can't insert incident opened history event", zap.Error(err)) return errors.New("can't insert incident opened history event") @@ -268,7 +280,7 @@ func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error // evaluateRules evaluates all the configured rules for this *incident.Object and // generates history entries for each matched rule. // Returns error on database failure. -func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) { +func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) { if i.Rules == nil { i.Rules = make(map[int64]struct{}) } @@ -293,7 +305,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) i.Rules[r.ID] = struct{}{} i.logger.Infof("Rule %q matches", r.Name) - err := i.AddRuleMatched(r) + err := i.AddRuleMatched(ctx, tx, r) if err != nil { i.logger.Errorw("Failed to upsert incident rule", zap.String("rule", r.Name), zap.Error(err)) @@ -307,7 +319,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) Type: RuleMatched, CausedByIncidentHistoryID: causedBy, } - insertedID, err := i.AddHistory(tx, history, true) + insertedID, err := i.AddHistory(ctx, tx, history, true) if err != nil { i.logger.Errorw("Failed to insert rule matched incident history", zap.String("rule", r.Name), zap.Error(err)) @@ -373,7 +385,7 @@ func (i *Incident) evaluateEscalations() { // notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered // as well as builds the incident recipients along with their channel types and sends notifications based on that. // Returns error on database failure. -func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { +func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { managed := i.HasManager() contactChannels := make(map[*recipient.Contact]map[string]struct{}) @@ -396,7 +408,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I r := i.runtimeConfig.Rules[escalation.RuleID] i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) - err := i.AddEscalationTriggered(state) + err := i.AddEscalationTriggered(ctx, tx, state) if err != nil { i.logger.Errorw( "Failed to upsert escalation state", zap.String("rule", r.Name), @@ -414,7 +426,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I Type: EscalationTriggered, CausedByIncidentHistoryID: causedBy, } - causedBy, err = i.AddHistory(tx, history, true) + causedBy, err = i.AddHistory(ctx, tx, history, true) if err != nil { i.logger.Errorw( "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), @@ -424,7 +436,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I return errors.New("failed to insert escalation triggered incident history") } - err = i.AddRecipient(tx, escalation, ev.ID) + err = i.AddRecipient(ctx, tx, escalation, ev.ID) if err != nil { return err } @@ -479,40 +491,45 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I CausedByIncidentHistoryID: causedBy, } - for chType := range channels { - i.logger.Infof("Notify contact %q via %q", contact.FullName, chType) + select { + case <-ctx.Done(): + return ctx.Err() + default: + for chType := range channels { + i.logger.Infof("Notify contact %q via %q", contact.FullName, chType) - hr.ChannelType = utils.ToDBString(chType) + hr.ChannelType = utils.ToDBString(chType) - _, err := i.AddHistory(tx, hr, false) - if err != nil { - i.logger.Errorw( - "Failed to insert contact notified incident history", zap.String("contact", contact.String()), - zap.Error(err), - ) - } + _, err := i.AddHistory(ctx, tx, hr, false) + if err != nil { + i.logger.Errorw( + "Failed to insert contact notified incident history", zap.String("contact", contact.String()), + zap.Error(err), + ) + } - chConf := i.runtimeConfig.Channels[chType] - if chConf == nil { - i.logger.Errorw("Could not find config for channel", zap.String("type", chType)) - continue - } + chConf := i.runtimeConfig.Channels[chType] + if chConf == nil { + i.logger.Errorw("Could not find config for channel", zap.String("type", chType)) + continue + } - plugin, err := chConf.GetPlugin() - if err != nil { - i.logger.Errorw("Could not initialize channel", zap.String("type", chType), zap.Error(err)) - continue - } + plugin, err := chConf.GetPlugin() + if err != nil { + i.logger.Errorw("Could not initialize channel", zap.String("type", chType), zap.Error(err)) + continue + } - err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL) - if err != nil { - i.logger.Errorw("Failed to send via channel", zap.String("type", chType), zap.Error(err)) - continue - } + err = plugin.Send(contact, i, ev, i.configFile.Icingaweb2URL) + if err != nil { + i.logger.Errorw("Failed to send via channel", zap.String("type", chType), zap.Error(err)) + continue + } - i.logger.Infow( - "Successfully sent a message via channel", zap.String("type", chType), zap.String("contact", contact.String()), - ) + i.logger.Infow( + "Successfully sent a message via channel", zap.String("type", chType), zap.String("contact", contact.String()), + ) + } } } @@ -522,7 +539,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.I // processAcknowledgementEvent processes the given ack event. // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. -func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error { +func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error { contact := i.runtimeConfig.GetContact(ev.Username) if contact == nil { i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) @@ -557,7 +574,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro Message: utils.ToDBString(ev.Message), } - _, err := i.AddHistory(tx, hr, false) + _, err := i.AddHistory(ctx, tx, hr, false) if err != nil { i.logger.Errorw( "Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err), @@ -569,7 +586,7 @@ func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) erro cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} stmt, _ := i.db.BuildUpsertStmt(cr) - _, err = tx.NamedExec(stmt, cr) + _, err = tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw( "Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err), diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 5d8521fd1..f2c32f172 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -1,6 +1,7 @@ package incident import ( + "context" "database/sql" "errors" "github.com/icinga/icinga-notifications/internal/config" @@ -10,6 +11,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "sync" ) @@ -19,7 +21,7 @@ var ( ) func GetCurrent( - db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, + ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool, ) (*Incident, bool, error) { currentIncidentsMu.Lock() @@ -41,7 +43,7 @@ func GetCurrent( SeverityBySource: map[int64]event.Severity{}, } - err := db.QueryRowx(db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) + err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && err != sql.ErrNoRows { logger.Errorw("Failed to load incident from database", zap.String("object", obj.DisplayName()), zap.Error(err)) @@ -51,34 +53,47 @@ func GetCurrent( incident.StartedAt = ir.StartedAt.Time() incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) - sourceSeverity := &SourceSeverity{IncidentID: ir.ID} - var sources []SourceSeverity - err := db.Select( - &sources, - db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), - ir.ID, event.SeverityOK, - ) - if err != nil { - incident.logger.Errorw("Failed to load incident source severities from database", zap.Error(err)) - - return nil, false, errors.New("failed to load incident source severities") - } - - for _, source := range sources { - incident.SeverityBySource[source.SourceID] = source.Severity - } - - state := &EscalationState{} - var states []*EscalationState - err = db.Select(&states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) - if err != nil { - incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) - - return nil, false, errors.New("failed to load incident rule escalation states") - } - - for _, state := range states { - incident.EscalationState[state.RuleEscalationID] = state + g, childCtx := errgroup.WithContext(ctx) + g.Go(func() error { + sourceSeverity := &SourceSeverity{IncidentID: ir.ID} + var sources []SourceSeverity + err := db.SelectContext( + childCtx, &sources, + db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), + ir.ID, event.SeverityOK, + ) + if err != nil { + incident.logger.Errorw("Failed to load incident source severities from database", zap.Error(err)) + + return errors.New("failed to load incident source severities") + } + + for _, source := range sources { + incident.SeverityBySource[source.SourceID] = source.Severity + } + + return childCtx.Err() + }) + + g.Go(func() error { + state := &EscalationState{} + var states []*EscalationState + err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) + if err != nil { + incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) + + return errors.New("failed to load incident rule escalation states") + } + + for _, state := range states { + incident.EscalationState[state.RuleEscalationID] = state + } + + return childCtx.Err() + }) + + if err := g.Wait(); err != nil { + return nil, false, err } currentIncident = incident @@ -100,7 +115,7 @@ func GetCurrent( contact := &ContactRow{} var contacts []*ContactRow - err := db.Select(&contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) + err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) if err != nil { currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err)) diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 09126d21f..476664152 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -1,6 +1,7 @@ package incident import ( + "context" "errors" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" @@ -15,7 +16,7 @@ import ( // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. -func (i *Incident) Sync(tx *sqlx.Tx) error { +func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { incidentRow := &IncidentRow{ ID: i.incidentRowID, ObjectID: i.Object.ID, @@ -24,7 +25,7 @@ func (i *Incident) Sync(tx *sqlx.Tx) error { Severity: i.Severity(), } - err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0) + err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0) if err != nil { return err } @@ -34,19 +35,19 @@ func (i *Incident) Sync(tx *sqlx.Tx) error { return nil } -func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { +func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { historyRow.IncidentID = i.incidentRowID stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { - historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow) + historyId, err := utils.InsertAndFetchId(ctx, tx, stmt, historyRow) if err != nil { return types.Int{}, err } return utils.ToDBInt(historyId), nil } else { - _, err := tx.NamedExec(stmt, historyRow) + _, err := tx.NamedExecContext(ctx, stmt, historyRow) if err != nil { return types.Int{}, err } @@ -55,27 +56,27 @@ func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) return types.Int{}, nil } -func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState) error { +func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.incidentRowID stmt, _ := i.db.BuildUpsertStmt(state) - _, err := tx.NamedExec(stmt, state) + _, err := tx.NamedExecContext(ctx, stmt, state) return err } // AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error { +func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) - _, err := tx.NamedExec(stmt, ie) + _, err := tx.NamedExecContext(ctx, stmt, ie) return err } // AddRecipient adds recipient from the given *rule.Escalation to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber @@ -108,7 +109,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI OldRecipientRole: oldRole, } - _, err := i.AddHistory(tx, hr, false) + _, err := i.AddHistory(ctx, tx, hr, false) if err != nil { i.logger.Errorw( "Failed to insert recipient role changed incident history", zap.String("escalation", escalation.DisplayName()), @@ -122,7 +123,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI } stmt, _ := i.db.BuildUpsertStmt(cr) - _, err := tx.NamedExec(stmt, cr) + _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw( "Failed to upsert incident recipient", zap.String("escalation", escalation.DisplayName()), @@ -138,15 +139,15 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI // AddRuleMatched syncs the given *rule.Rule to the database. // Returns an error on database failure. -func (i *Incident) AddRuleMatched(tx *sqlx.Tx, r *rule.Rule) error { +func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) - _, err := tx.NamedExec(stmt, rr) + _, err := tx.NamedExecContext(ctx, stmt, rr) return err } -func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error { +func (i *Incident) AddSourceSeverity(ctx context.Context, tx *sqlx.Tx, severity event.Severity, sourceID int64) error { i.SeverityBySource[sourceID] = severity sourceSeverity := &SourceSeverity{ @@ -156,7 +157,7 @@ func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourc } stmt, _ := i.db.BuildUpsertStmt(sourceSeverity) - _, err := tx.NamedExec(stmt, sourceSeverity) + _, err := tx.NamedExecContext(ctx, stmt, sourceSeverity) return err } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index dc6bb678d..26ce0d903 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -1,7 +1,6 @@ package listener import ( - "context" "crypto/subtle" "encoding/json" "fmt" @@ -86,7 +85,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } } - tx, err := l.db.BeginTxx(context.TODO(), nil) + ctx := req.Context() + tx, err := l.db.BeginTxx(ctx, nil) if err != nil { l.logger.Errorw("can't start a db transaction", zap.Error(err)) @@ -96,7 +96,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } defer func() { _ = tx.Rollback() }() - obj, err := object.FromTags(l.db, tx, ev.Tags) + obj, err := object.FromTags(ctx, l.db, tx, ev.Tags) if err != nil { l.logger.Errorln(err) @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, created, err := incident.GetCurrent(l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) + currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) @@ -137,7 +137,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { l.logger.Infof("Processing event") - if err := currentIncident.ProcessEvent(tx, ev, created); err != nil { + if err := currentIncident.ProcessEvent(ctx, tx, ev, created); err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) return diff --git a/internal/object/object.go b/internal/object/object.go index bea7b453c..147895857 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -2,6 +2,7 @@ package object import ( "bytes" + "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -31,7 +32,7 @@ var ( cacheMu sync.Mutex ) -func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { +func FromTags(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { id := ID(tags) cacheMu.Lock() @@ -43,7 +44,6 @@ func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, er } object = &Object{ID: id, Tags: tags, db: db} - cache[id.String()] = object stmt, _ := object.db.BuildInsertIgnoreStmt(&ObjectRow{}) dbObj := &ObjectRow{ @@ -55,7 +55,7 @@ func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, er dbObj.Service = utils.ToDBString(service) } - _, err := tx.NamedExec(stmt, dbObj) + _, err := tx.NamedExecContext(ctx, stmt, dbObj) if err != nil { return nil, fmt.Errorf("failed to insert object: %s", err) } @@ -104,7 +104,16 @@ func (o *Object) String() string { return b.String() } -func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string) error { +func (o *Object) UpdateCache() { + cacheMu.Lock() + defer cacheMu.Unlock() + + cache[o.ID.String()] = o +} + +func (o *Object) UpdateMetadata( + ctx context.Context, tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string, +) error { o.mu.Lock() defer o.mu.Unlock() @@ -117,20 +126,20 @@ func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url type } stmt, _ := o.db.BuildUpsertStmt(&SourceMetadata{}) - _, err := tx.NamedExec(stmt, sourceMetadata) + _, err := tx.NamedExecContext(ctx, stmt, sourceMetadata) if err != nil { return err } extraTag := &ExtraTagRow{ObjectId: o.ID, SourceId: source} - _, err = tx.NamedExec(`DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) + _, err = tx.NamedExecContext(ctx, `DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) if err != nil { return err } if len(extraTags) > 0 { stmt, _ = o.db.BuildInsertStmt(extraTag) - _, err = tx.NamedExec(stmt, sourceMetadata.mapToExtraTags()) + _, err = tx.NamedExecContext(ctx, stmt, sourceMetadata.mapToExtraTags()) if err != nil { return err } @@ -140,13 +149,7 @@ func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url type o.Metadata = make(map[int64]*SourceMetadata) } - if m := o.Metadata[source]; m != nil { - m.Name = name - m.URL = url - m.ExtraTags = extraTags - } else { - o.Metadata[source] = sourceMetadata - } + o.Metadata[source] = sourceMetadata return nil } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 6b3a73a32..7bf6b53b0 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,6 +1,7 @@ package utils import ( + "context" "database/sql" "fmt" "github.com/icinga/icingadb/pkg/driver" @@ -30,21 +31,21 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str } // InsertAndFetchId executes the given query and fetches the last inserted ID. -func InsertAndFetchId(tx *sqlx.Tx, stmt string, args any) (int64, error) { +func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64 if tx.DriverName() == driver.PostgreSQL { - preparedStmt, err := tx.PrepareNamed(stmt + " RETURNING id") + preparedStmt, err := tx.PrepareNamedContext(ctx, stmt+" RETURNING id") if err != nil { return 0, err } - defer preparedStmt.Close() + defer func() { _ = preparedStmt.Close() }() err = preparedStmt.Get(&lastInsertId, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) } } else { - result, err := tx.NamedExec(stmt, args) + result, err := tx.NamedExecContext(ctx, stmt, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) }