From a76f505047ac8867402f98117c7f01a05e9050a7 Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Mon, 29 Jul 2024 14:24:39 -0400 Subject: [PATCH] Prevent deadlocking event-handler on missing recordings (#44659) If the number of session.upload events without a corresponding session recording is higher than the configured concurrency, then the event-handler could deadlock while attempting to find said recording. To prevent this from occurring processing of session recordings has been modified such that: - if a recording does not exist and the session.upload event is from more than 48 hours in the past it is assumed to be lost and no more attempts to process the recording will happen - all processing of recordings that were not found is now done in a separate background routine, instead of inline with event processing The storage directory will now contain information about sessions with a missing recording. The background routine will process them periodically, capping the number of attempts per recording at 3. --- integrations/event-handler/app.go | 2 +- integrations/event-handler/fluentd_client.go | 2 +- integrations/event-handler/go.mod | 2 +- .../event-handler/session_events_job.go | 202 ++++++++++++------ integrations/event-handler/state.go | 105 ++++++++- integrations/event-handler/state_test.go | 60 ++++++ 6 files changed, 304 insertions(+), 69 deletions(-) diff --git a/integrations/event-handler/app.go b/integrations/event-handler/app.go index 0f8922e956257..6a44e76aa4221 100644 --- a/integrations/event-handler/app.go +++ b/integrations/event-handler/app.go @@ -79,6 +79,7 @@ func (a *App) Run(ctx context.Context) error { a.SpawnCriticalJob(a.eventsJob) a.SpawnCriticalJob(a.sessionEventsJob) + a.SpawnCritical(a.sessionEventsJob.processMissingRecordings) <-a.Process.Done() lastWindow := a.EventWatcher.getWindowStartTime() @@ -144,7 +145,6 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error } log.WithFields(fields).Debug("Event sent") - log.WithField("event", e).Debug("Event dump") return nil } diff --git a/integrations/event-handler/fluentd_client.go b/integrations/event-handler/fluentd_client.go index 79468b6f3da27..92015bef4d312 100644 --- a/integrations/event-handler/fluentd_client.go +++ b/integrations/event-handler/fluentd_client.go @@ -91,7 +91,7 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) { // Send sends event to fluentd func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error { - log.WithField("json", string(b)).Debug("JSON to send") + log.WithField("payload", string(b)).Debug("Sending event to Fluentd") req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) if err != nil { diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index 1d2afd67da70e..a3b8d8b7f835d 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -18,7 +18,6 @@ require ( github.com/sethvargo/go-limiter v0.7.2 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 - golang.org/x/sync v0.7.0 google.golang.org/protobuf v1.34.0 ) @@ -284,6 +283,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/term v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/integrations/event-handler/session_events_job.go b/integrations/event-handler/session_events_job.go index bdbee2ffe93c1..4013f419183e0 100644 --- a/integrations/event-handler/session_events_job.go +++ b/integrations/event-handler/session_events_job.go @@ -16,34 +16,26 @@ package main import ( "context" - "os" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" - "golang.org/x/sync/semaphore" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/integrations/lib" "github.com/gravitational/teleport/integrations/lib/backoff" "github.com/gravitational/teleport/integrations/lib/logger" ) -const ( - // sessionBackoffBase is an initial (minimum) backoff value. - sessionBackoffBase = 3 * time.Second - // sessionBackoffMax is a backoff threshold - sessionBackoffMax = 2 * time.Minute - // sessionBackoffNumTries is the maximum number of backoff tries - sessionBackoffNumTries = 5 -) - // session is the utility struct used for session ingestion type session struct { // ID current ID ID string // Index current event index Index int64 + // UploadTime is the time at which the recording was uploaded. + UploadTime time.Time } // SessionEventsJob incapsulates session events consumption logic @@ -51,14 +43,14 @@ type SessionEventsJob struct { lib.ServiceJob app *App sessions chan session - semaphore *semaphore.Weighted + semaphore chan struct{} } // NewSessionEventsJob creates new EventsJob structure func NewSessionEventsJob(app *App) *SessionEventsJob { j := &SessionEventsJob{ app: app, - semaphore: semaphore.NewWeighted(int64(app.Config.Concurrency)), + semaphore: make(chan struct{}, app.Config.Concurrency), sessions: make(chan session), } @@ -92,50 +84,22 @@ func (j *SessionEventsJob) run(ctx context.Context) error { logger.Info("Starting session ingest") - if err := j.semaphore.Acquire(ctx, 1); err != nil { - logger.WithError(err).Error("Failed to acquire semaphore") - continue + select { + case j.semaphore <- struct{}{}: + case <-ctx.Done(): + logger.WithError(ctx.Err()).Error("Failed to acquire semaphore") + return nil } func(s session, log logrus.FieldLogger) { j.app.SpawnCritical(func(ctx context.Context) error { - defer j.semaphore.Release(1) - - backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock()) - backoffCount := sessionBackoffNumTries - - for { - retry, err := j.consumeSession(ctx, s) - - // If sessions needs to retry - if err != nil && retry { - log.WithError(err).WithField("n", backoffCount).Error("Session ingestion error, retrying") - - // Sleep for required interval - err := backoff.Do(ctx) - if err != nil { - return trace.Wrap(err) - } - - // Check if there are number of tries left - backoffCount-- - if backoffCount < 0 { - log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting") - return nil - } - continue - } - - if err != nil { - if !lib.IsCanceled(err) { - log.WithField("err", err).Error("Session ingestion failed") - } - return err - } - - // No errors, we've finished with this session - return nil + defer func() { <-j.semaphore }() + + if err := j.processSession(ctx, s, 0); err != nil { + return trace.Wrap(err) } + + return nil }) }(s, logger) case <-ctx.Done(): @@ -147,6 +111,125 @@ func (j *SessionEventsJob) run(ctx context.Context) error { } } +func (j *SessionEventsJob) processSession(ctx context.Context, s session, processingAttempt int) error { + const ( + // maxNumberOfProcessingAttempts is the number of times a non-existent + // session recording will be processed before assuming the recording + // is lost forever. + maxNumberOfProcessingAttempts = 2 + // eventTimeCutOff is the point of time in the past at which a missing + // session recording is assumed to be lost forever. + eventTimeCutOff = -48 * time.Hour + // sessionBackoffBase is an initial (minimum) backoff value. + sessionBackoffBase = 3 * time.Second + // sessionBackoffMax is a backoff threshold + sessionBackoffMax = time.Minute + // sessionBackoffNumTries is the maximum number of backoff tries + sessionBackoffNumTries = 3 + ) + + log := logger.Get(ctx).WithField("id", s.ID).WithField("index", s.Index) + backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock()) + attempt := sessionBackoffNumTries + + for { + retry, err := j.consumeSession(ctx, s) + switch { + case trace.IsNotFound(err): + // If the session was not found, and it was from more + // than the [eventTimeCutOff], abort processing it any further + // as the recording is likely never going to exist. + if (!s.UploadTime.IsZero() && s.UploadTime.Before(time.Now().Add(eventTimeCutOff))) || + processingAttempt > maxNumberOfProcessingAttempts { + // Remove any trace of the session so that it is not + // processed again in the future and doesn't stick around + // on disk forever. + return trace.NewAggregate(j.app.State.RemoveMissingRecording(s.ID), j.app.State.RemoveSession(s.ID)) + } + + // Otherwise, mark that the session was failed to be processed + // so that it can be tried again in the background. + return trace.NewAggregate(j.app.State.SetMissingRecording(s, processingAttempt+1), j.app.State.RemoveSession(s.ID)) + case err != nil && retry: + // If the number of retries has been exceeded, then + // abort processing the session any further. + attempt-- + if attempt <= 0 { + log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting") + return trace.LimitExceeded("Session ingestion exceeded attempt limit") + } + + log.WithError(err).WithField("n", attempt).Error("Session ingestion error, retrying") + + // Perform backoff before retrying the session again. + if err := backoff.Do(ctx); err != nil { + return trace.Wrap(err) + } + case err != nil: + // Abort on any errors that don't require a retry. + if !lib.IsCanceled(err) { + log.WithField("err", err).Error("Session ingestion failed") + } + return trace.Wrap(err) + default: + // No errors, we've finished processing the session. + return nil + } + } +} + +// processMissingRecordings periodically attempts to reconcile events +// from session recordings that were previously not found. +func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error { + const ( + initialProcessingDelay = time.Minute + processingInterval = 3 * time.Minute + maxNumberOfInflightSessions = 10 + ) + + ctx, cancel := context.WithCancel(ctx) + j.app.Process.OnTerminate(func(_ context.Context) error { + cancel() + return nil + }) + + jitter := retryutils.NewSeventhJitter() + timer := time.NewTimer(jitter(initialProcessingDelay)) + defer timer.Stop() + + semaphore := make(chan struct{}, maxNumberOfInflightSessions) + for { + select { + case <-timer.C: + case <-ctx.Done(): + return nil + } + + err := j.app.State.IterateMissingRecordings(func(sess session, attempts int) error { + select { + case semaphore <- struct{}{}: + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + + go func() { + defer func() { <-semaphore }() + + if err := j.processSession(ctx, sess, attempts); err != nil { + logger.Get(ctx).WithError(err).Debug("Failed processing session recording") + } + }() + + return nil + }) + if err != nil && !lib.IsCanceled(err) { + logger.Get(ctx).WithError(err).Warn("Unable to load previously failed sessions for processing") + } + + timer.Reset(jitter(processingInterval)) + } +} + // restartPausedSessions restarts sessions saved in state func (j *SessionEventsJob) restartPausedSessions() error { sessions, err := j.app.State.GetSessions() @@ -184,11 +267,7 @@ func (j *SessionEventsJob) restartPausedSessions() error { // consumeSession ingests session func (j *SessionEventsJob) consumeSession(ctx context.Context, s session) (bool, error) { - log := logger.Get(ctx) - url := j.app.Config.FluentdSessionURL + "." + s.ID + ".log" - - log.WithField("id", s.ID).WithField("index", s.Index).Info("Started session events ingest") chEvt, chErr := j.app.EventWatcher.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index) Loop: @@ -199,7 +278,7 @@ Loop: case evt, ok := <-chEvt: if !ok { - log.WithField("id", s.ID).Info("Finished session events ingest") + logrus.WithField("id", s.ID).Info("Finished session events ingest") break Loop // Break the main loop } @@ -236,12 +315,7 @@ Loop: // We have finished ingestion and do not need session state anymore err := j.app.State.RemoveSession(s.ID) - // If the session had no events, the file won't exist, so we ignore the error - if err != nil && !os.IsNotExist(err) { - return false, trace.Wrap(err) - } - - return false, nil + return false, trace.Wrap(err) } // RegisterSession starts session event ingestion @@ -251,7 +325,7 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent return trace.Wrap(err) } - s := session{ID: e.SessionID, Index: 0} + s := session{ID: e.SessionID, Index: 0, UploadTime: e.Time} go func() { select { diff --git a/integrations/event-handler/state.go b/integrations/event-handler/state.go index bc5721e1c266c..acb98b95ff03f 100644 --- a/integrations/event-handler/state.go +++ b/integrations/event-handler/state.go @@ -18,14 +18,19 @@ package main import ( "encoding/binary" + "encoding/json" + "errors" + "io/fs" "net" "os" "path" "strings" + "syscall" "time" "github.com/gravitational/trace" "github.com/peterbourgon/diskv/v3" + "github.com/sirupsen/logrus" "github.com/gravitational/teleport/integrations/event-handler/lib" "github.com/gravitational/teleport/integrations/lib/logger" @@ -50,6 +55,9 @@ const ( // sessionPrefix is the session key prefix sessionPrefix = "session" + // missingRecordingPrefix is the missing recording key prefix + missingRecordingPrefix = "missing_recording" + // storageDirPerms is storage directory permissions when created storageDirPerms = 0755 ) @@ -237,7 +245,100 @@ func (s *State) SetSessionIndex(id string, index int64) error { return s.dv.Write(sessionPrefix+id, b) } -// RemoveSession removes session from the state +// SetMissingRecording writes the session with missing recording into state. +func (s *State) SetMissingRecording(sess session, attempt int) error { + b, err := json.Marshal(missingRecording{ + Index: sess.Index, + Attempt: attempt, + Timestamp: sess.UploadTime, + }) + if err != nil { + return trace.Wrap(err) + } + + if err := s.dv.Write(missingRecordingPrefix+sess.ID, b); err != nil { + return trace.Wrap(err) + } + + return trace.Wrap(s.RemoveSession(sess.ID)) +} + +type missingRecording struct { + ID string `json:"id,omitempty"` + Index int64 + Attempt int + Timestamp time.Time +} + +// IterateMissingRecordings finds any sessions with a missing recording and +// provides them to the callback for processing. +func (s *State) IterateMissingRecordings(callback func(s session, attempts int) error) error { + closeC := make(chan struct{}) + defer close(closeC) + for key := range s.dv.KeysPrefix(missingRecordingPrefix, closeC) { + b, err := s.dv.Read(key) + if err != nil { + // Ignore any errors caused by the file being removed + // by an external entity. + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + continue + } + + return trace.Wrap(err) + } + + var m missingRecording + if err := json.Unmarshal(b, &m); err != nil { + logrus.WithError(err).Warnf("Failed to unmarshal missing recording %s from persisted state", key) + continue + } + + s := session{ + ID: key[len(missingRecordingPrefix):], + Index: m.Index, + UploadTime: m.Timestamp, + } + + if err := callback(s, m.Attempt); err != nil { + return trace.Wrap(err) + } + } + + return nil +} + +// RemoveMissingRecording removes the session with a missing recording from state. +func (s *State) RemoveMissingRecording(id string) error { + err := s.dv.Erase(missingRecordingPrefix + id) + if err == nil { + return nil + } + + // If the session had no events, the file won't exist, so we ignore the error + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + return nil + } + + return trace.Wrap(err) +} + +// RemoveSession removes session from the state. func (s *State) RemoveSession(id string) error { - return s.dv.Erase(sessionPrefix + id) + err := s.dv.Erase(sessionPrefix + id) + if err == nil { + return nil + } + + // If the session had no events, the file won't exist, so we ignore the error + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + return nil + } + + return trace.Wrap(err) } diff --git a/integrations/event-handler/state_test.go b/integrations/event-handler/state_test.go index 828f38ff09f76..11ee192b95051 100644 --- a/integrations/event-handler/state_test.go +++ b/integrations/event-handler/state_test.go @@ -17,6 +17,9 @@ limitations under the License. package main import ( + "cmp" + "slices" + "strconv" "testing" "time" @@ -88,3 +91,60 @@ func TestStatePersist(t *testing.T) { assert.Equal(t, "testCursor", cursor) assert.Equal(t, "testId", id) } + +func TestStateMissingRecordings(t *testing.T) { + config := newStartCmdConfig(t) + state, err := NewState(config) + require.NoError(t, err) + + // Iterating should find no records if nothing has been stored yet. + err = state.IterateMissingRecordings(func(s session, attempts int) error { + t.Errorf("no missing sessions should have been found, got %v", s) + return nil + }) + require.NoError(t, err) + + // Removing a missing item should not produce an error. + err = state.RemoveMissingRecording("llama") + require.NoError(t, err) + + expected := make([]session, 0, 10) + for i := 0; i < 10; i++ { + s := session{ + ID: strconv.Itoa(i), + Index: int64(i), + UploadTime: time.Now().UTC(), + } + err := state.SetMissingRecording(s, 0) + expected = append(expected, s) + assert.NoError(t, err) + } + + // Iterating find all stored records and validate they match + // the original sessions. + records := make([]session, 0, 10) + err = state.IterateMissingRecordings(func(s session, attempts int) error { + records = append(records, s) + return nil + }) + require.NoError(t, err) + + slices.SortFunc(records, func(a, b session) int { + return cmp.Compare(a.Index, b.Index) + }) + + require.EqualValues(t, expected, records) + + // Remove all items and validate they no longer exist. + for i := 0; i < 10; i++ { + err = state.RemoveMissingRecording(expected[i].ID) + require.NoError(t, err) + } + + err = state.IterateMissingRecordings(func(s session, attempts int) error { + t.Errorf("no missing sessions should have been found, got %v", s) + return nil + }) + require.NoError(t, err) + +}