Skip to content

Commit

Permalink
Prevent deadlocking event-handler on missing recordings
Browse files Browse the repository at this point in the history
If the number of session.upload events without a corresponding
session recording is higher than the configured concurrency, then
the event-handler could deadlock while attempting to find said
recording. To prevent this from occurring processing of session
recordings has been modified such that:

- if a recording does not exist and the session.upload event is
  from more than 48 hours in the past it is assumed to be lost
  and no more attempts to process the recording will happen
- all processing of recordings that were not found is now done in
  a separate background routine, instead of inline with event
  processing

The storage directory will now contain information about sessions
with a missing recording. The background routine will process them
periodically, capping the number of attempts per recording at 3.
  • Loading branch information
rosstimothy committed Jul 29, 2024
1 parent 08b2c79 commit 04dcbe8
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 69 deletions.
2 changes: 1 addition & 1 deletion integrations/event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (a *App) Run(ctx context.Context) error {

a.SpawnCriticalJob(a.eventsJob)
a.SpawnCriticalJob(a.sessionEventsJob)
a.SpawnCritical(a.sessionEventsJob.processMissingRecordings)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
Expand Down Expand Up @@ -144,7 +145,6 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
}

log.WithFields(fields).Debug("Event sent")
log.WithField("event", e).Debug("Event dump")

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion integrations/event-handler/fluentd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) {

// Send sends event to fluentd
func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error {
log.WithField("json", string(b)).Debug("JSON to send")
log.WithField("payload", string(b)).Debug("Sending event to Fluentd")

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integrations/event-handler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/sethvargo/go-limiter v1.0.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
google.golang.org/protobuf v1.34.2
)

Expand Down Expand Up @@ -279,6 +278,7 @@ require (
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
202 changes: 138 additions & 64 deletions integrations/event-handler/session_events_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,41 @@ package main

import (
"context"
"os"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"

"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/backoff"
"github.com/gravitational/teleport/integrations/lib/logger"
)

const (
// sessionBackoffBase is an initial (minimum) backoff value.
sessionBackoffBase = 3 * time.Second
// sessionBackoffMax is a backoff threshold
sessionBackoffMax = 2 * time.Minute
// sessionBackoffNumTries is the maximum number of backoff tries
sessionBackoffNumTries = 5
)

// session is the utility struct used for session ingestion
type session struct {
// ID current ID
ID string
// Index current event index
Index int64
// UploadTime is the time at which the recording was uploaded.
UploadTime time.Time
}

// SessionEventsJob incapsulates session events consumption logic
type SessionEventsJob struct {
lib.ServiceJob
app *App
sessions chan session
semaphore *semaphore.Weighted
semaphore chan struct{}
}

// NewSessionEventsJob creates new EventsJob structure
func NewSessionEventsJob(app *App) *SessionEventsJob {
j := &SessionEventsJob{
app: app,
semaphore: semaphore.NewWeighted(int64(app.Config.Concurrency)),
semaphore: make(chan struct{}, app.Config.Concurrency),
sessions: make(chan session),
}

Expand Down Expand Up @@ -92,50 +84,22 @@ func (j *SessionEventsJob) run(ctx context.Context) error {

logger.Info("Starting session ingest")

if err := j.semaphore.Acquire(ctx, 1); err != nil {
logger.WithError(err).Error("Failed to acquire semaphore")
continue
select {
case j.semaphore <- struct{}{}:
case <-ctx.Done():
logger.WithError(ctx.Err()).Error("Failed to acquire semaphore")
return nil
}

func(s session, log logrus.FieldLogger) {
j.app.SpawnCritical(func(ctx context.Context) error {
defer j.semaphore.Release(1)

backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock())
backoffCount := sessionBackoffNumTries

for {
retry, err := j.consumeSession(ctx, s)

// If sessions needs to retry
if err != nil && retry {
log.WithError(err).WithField("n", backoffCount).Error("Session ingestion error, retrying")

// Sleep for required interval
err := backoff.Do(ctx)
if err != nil {
return trace.Wrap(err)
}

// Check if there are number of tries left
backoffCount--
if backoffCount < 0 {
log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting")
return nil
}
continue
}

if err != nil {
if !lib.IsCanceled(err) {
log.WithField("err", err).Error("Session ingestion failed")
}
return err
}

// No errors, we've finished with this session
return nil
defer func() { <-j.semaphore }()

if err := j.processSession(ctx, s, 0); err != nil {
return trace.Wrap(err)
}

return nil
})
}(s, logger)
case <-ctx.Done():
Expand All @@ -147,6 +111,125 @@ func (j *SessionEventsJob) run(ctx context.Context) error {
}
}

func (j *SessionEventsJob) processSession(ctx context.Context, s session, processingAttempt int) error {
const (
// maxNumberOfProcessingAttempts is the number of times a non-existent
// session recording will be processed before assuming the recording
// is lost forever.
maxNumberOfProcessingAttempts = 2
// eventTimeCutOff is the point of time in the past at which a missing
// session recording is assumed to be lost forever.
eventTimeCutOff = -48 * time.Hour
// sessionBackoffBase is an initial (minimum) backoff value.
sessionBackoffBase = 3 * time.Second
// sessionBackoffMax is a backoff threshold
sessionBackoffMax = time.Minute
// sessionBackoffNumTries is the maximum number of backoff tries
sessionBackoffNumTries = 3
)

log := logger.Get(ctx).WithField("id", s.ID).WithField("index", s.Index)
backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock())
attempt := sessionBackoffNumTries

for {
retry, err := j.consumeSession(ctx, s)
switch {
case trace.IsNotFound(err):
// If the session was not found, and it was from more
// than the [eventTimeCutOff], abort processing it any further
// as the recording is likely never going to exist.
if (!s.UploadTime.IsZero() && s.UploadTime.Before(time.Now().Add(eventTimeCutOff))) ||
processingAttempt > maxNumberOfProcessingAttempts {
// Remove any trace of the session so that it is not
// processed again in the future and doesn't stick around
// on disk forever.
return trace.NewAggregate(j.app.State.RemoveMissingRecording(s.ID), j.app.State.RemoveSession(s.ID))
}

// Otherwise, mark that the session was failed to be processed
// so that it can be tried again in the background.
return trace.NewAggregate(j.app.State.SetMissingRecording(s, processingAttempt+1), j.app.State.RemoveSession(s.ID))
case err != nil && retry:
// If the number of retries has been exceeded, then
// abort processing the session any further.
attempt--
if attempt <= 0 {
log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting")
return trace.LimitExceeded("Session ingestion exceeded attempt limit")
}

log.WithError(err).WithField("n", attempt).Error("Session ingestion error, retrying")

// Perform backoff before retrying the session again.
if err := backoff.Do(ctx); err != nil {
return trace.Wrap(err)
}
case err != nil:
// Abort on any errors that don't require a retry.
if !lib.IsCanceled(err) {
log.WithField("err", err).Error("Session ingestion failed")
}
return trace.Wrap(err)
default:
// No errors, we've finished processing the session.
return nil
}
}
}

// processMissingRecordings periodically attempts to reconcile events
// from session recordings that were previously not found.
func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error {
const (
initialProcessingDelay = time.Minute
processingInterval = 3 * time.Minute
maxNumberOfInflightSessions = 10
)

ctx, cancel := context.WithCancel(ctx)
j.app.Process.OnTerminate(func(_ context.Context) error {
cancel()
return nil
})

jitter := retryutils.NewSeventhJitter()
timer := time.NewTimer(jitter(initialProcessingDelay))
defer timer.Stop()

semaphore := make(chan struct{}, maxNumberOfInflightSessions)
for {
select {
case <-timer.C:
case <-ctx.Done():
return nil
}

err := j.app.State.IterateMissingRecordings(func(sess session, attempts int) error {
select {
case semaphore <- struct{}{}:
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}

go func() {
defer func() { <-semaphore }()

if err := j.processSession(ctx, sess, attempts); err != nil {
logger.Get(ctx).WithError(err).Debug("Failed processing session recording")
}
}()

return nil
})
if err != nil && !lib.IsCanceled(err) {
logger.Get(ctx).WithError(err).Warn("Unable to load previously failed sessions for processing")
}

timer.Reset(jitter(processingInterval))
}
}

// restartPausedSessions restarts sessions saved in state
func (j *SessionEventsJob) restartPausedSessions() error {
sessions, err := j.app.State.GetSessions()
Expand Down Expand Up @@ -184,11 +267,7 @@ func (j *SessionEventsJob) restartPausedSessions() error {

// consumeSession ingests session
func (j *SessionEventsJob) consumeSession(ctx context.Context, s session) (bool, error) {
log := logger.Get(ctx)

url := j.app.Config.FluentdSessionURL + "." + s.ID + ".log"

log.WithField("id", s.ID).WithField("index", s.Index).Info("Started session events ingest")
chEvt, chErr := j.app.EventWatcher.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index)

Loop:
Expand All @@ -199,7 +278,7 @@ Loop:

case evt, ok := <-chEvt:
if !ok {
log.WithField("id", s.ID).Info("Finished session events ingest")
logrus.WithField("id", s.ID).Info("Finished session events ingest")
break Loop // Break the main loop
}

Expand Down Expand Up @@ -236,12 +315,7 @@ Loop:

// We have finished ingestion and do not need session state anymore
err := j.app.State.RemoveSession(s.ID)
// If the session had no events, the file won't exist, so we ignore the error
if err != nil && !os.IsNotExist(err) {
return false, trace.Wrap(err)
}

return false, nil
return false, trace.Wrap(err)
}

// RegisterSession starts session event ingestion
Expand All @@ -251,7 +325,7 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent
return trace.Wrap(err)
}

s := session{ID: e.SessionID, Index: 0}
s := session{ID: e.SessionID, Index: 0, UploadTime: e.Time}

go func() {
select {
Expand Down
Loading

0 comments on commit 04dcbe8

Please sign in to comment.