Skip to content

Commit

Permalink
Use ctx everywhere & cache object only when the tx succeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 22, 2023
1 parent dfd186d commit 72f0f89
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 123 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/teambition/rrule-go v1.8.2
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)

require (
Expand All @@ -30,7 +31,6 @@ require (
github.com/ssgreg/journald v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
7 changes: 4 additions & 3 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
Expand Down Expand Up @@ -33,15 +34,15 @@ func (i *IncidentRow) Upsert() interface{} {
// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExec(stmt, i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}
Expand Down
37 changes: 19 additions & 18 deletions internal/incident/history.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
Expand All @@ -14,7 +15,7 @@ import (
// Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database.
// Before syncing any incident related database entries, this method should be called at least once.
// Returns an error on db failure.
func (i *Incident) Sync(tx *sqlx.Tx) error {
func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error {
incidentRow := &IncidentRow{
ID: i.incidentRowID,
ObjectID: i.Object.ID,
Expand All @@ -23,7 +24,7 @@ func (i *Incident) Sync(tx *sqlx.Tx) error {
Severity: i.Severity(),
}

err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0)
err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0)
if err != nil {
return err
}
Expand All @@ -33,19 +34,19 @@ func (i *Incident) Sync(tx *sqlx.Tx) error {
return nil
}

func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) {
func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) {
historyRow.IncidentID = i.incidentRowID

stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id")
if fetchId {
historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow)
historyId, err := utils.InsertAndFetchId(ctx, tx, stmt, historyRow)
if err != nil {
return types.Int{}, err
}

return utils.ToDBInt(historyId), nil
} else {
_, err := tx.NamedExec(stmt, historyRow)
_, err := tx.NamedExecContext(ctx, stmt, historyRow)
if err != nil {
return types.Int{}, err
}
Expand All @@ -54,30 +55,30 @@ func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool)
return types.Int{}, nil
}

func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) {
func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) {
state.IncidentID = i.incidentRowID

stmt, _ := i.db.BuildUpsertStmt(state)
_, err := tx.NamedExec(stmt, state)
_, err := tx.NamedExecContext(ctx, stmt, state)
if err != nil {
return types.Int{}, err
}

return i.AddHistory(tx, hr, true)
return i.AddHistory(ctx, tx, hr, true)
}

// AddEvent Inserts incident history record to the database and returns an error on db failure.
func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error {
func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID}
stmt, _ := i.db.BuildInsertStmt(ie)
_, err := tx.NamedExec(stmt, ie)
_, err := tx.NamedExecContext(ctx, stmt, ie)

return err
}

// AddRecipient adds recipient from the given *rule.Escalation to this incident.
// Syncs also all the recipients with the database and returns an error on db failure.
func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error {
func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error {
newRole := RoleRecipient
if i.HasManager() {
newRole = RoleSubscriber
Expand Down Expand Up @@ -110,7 +111,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI
OldRecipientRole: oldRole,
}

_, err := i.AddHistory(tx, hr, false)
_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
return err
}
Expand All @@ -119,7 +120,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI
}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err := tx.NamedExec(stmt, cr)
_, err := tx.NamedExecContext(ctx, stmt, cr)
if err != nil {
return fmt.Errorf("failed to upsert incident contact %s: %w", r, err)
}
Expand All @@ -130,18 +131,18 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI

// AddRuleMatchedHistory syncs the given *rule.Rule and history entry to the database.
// Returns an error on database failure.
func (i *Incident) AddRuleMatchedHistory(tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) {
func (i *Incident) AddRuleMatchedHistory(ctx context.Context, tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) {
rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID}
stmt, _ := i.db.BuildUpsertStmt(rr)
_, err := tx.NamedExec(stmt, rr)
_, err := tx.NamedExecContext(ctx, stmt, rr)
if err != nil {
return types.Int{}, err
}

return i.AddHistory(tx, hr, true)
return i.AddHistory(ctx, tx, hr, true)
}

func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error {
func (i *Incident) AddSourceSeverity(ctx context.Context, tx *sqlx.Tx, severity event.Severity, sourceID int64) error {
i.SeverityBySource[sourceID] = severity

sourceSeverity := &SourceSeverity{
Expand All @@ -151,7 +152,7 @@ func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourc
}

stmt, _ := i.db.BuildUpsertStmt(sourceSeverity)
_, err := tx.NamedExec(stmt, sourceSeverity)
_, err := tx.NamedExecContext(ctx, stmt, sourceSeverity)

return err
}
Loading

0 comments on commit 72f0f89

Please sign in to comment.