diff --git a/integrations/event-handler/app.go b/integrations/event-handler/app.go index 0f8922e956257..6a44e76aa4221 100644 --- a/integrations/event-handler/app.go +++ b/integrations/event-handler/app.go @@ -79,6 +79,7 @@ func (a *App) Run(ctx context.Context) error { a.SpawnCriticalJob(a.eventsJob) a.SpawnCriticalJob(a.sessionEventsJob) + a.SpawnCritical(a.sessionEventsJob.processMissingRecordings) <-a.Process.Done() lastWindow := a.EventWatcher.getWindowStartTime() @@ -144,7 +145,6 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error } log.WithFields(fields).Debug("Event sent") - log.WithField("event", e).Debug("Event dump") return nil } diff --git a/integrations/event-handler/fluentd_client.go b/integrations/event-handler/fluentd_client.go index 79468b6f3da27..92015bef4d312 100644 --- a/integrations/event-handler/fluentd_client.go +++ b/integrations/event-handler/fluentd_client.go @@ -91,7 +91,7 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) { // Send sends event to fluentd func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error { - log.WithField("json", string(b)).Debug("JSON to send") + log.WithField("payload", string(b)).Debug("Sending event to Fluentd") req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) if err != nil { diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index 1d2afd67da70e..a3b8d8b7f835d 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -18,7 +18,6 @@ require ( github.com/sethvargo/go-limiter v0.7.2 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 - golang.org/x/sync v0.7.0 google.golang.org/protobuf v1.34.0 ) @@ -284,6 +283,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/term v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/integrations/event-handler/session_events_job.go b/integrations/event-handler/session_events_job.go index bdbee2ffe93c1..4013f419183e0 100644 --- a/integrations/event-handler/session_events_job.go +++ b/integrations/event-handler/session_events_job.go @@ -16,34 +16,26 @@ package main import ( "context" - "os" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" - "golang.org/x/sync/semaphore" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/integrations/lib" "github.com/gravitational/teleport/integrations/lib/backoff" "github.com/gravitational/teleport/integrations/lib/logger" ) -const ( - // sessionBackoffBase is an initial (minimum) backoff value. - sessionBackoffBase = 3 * time.Second - // sessionBackoffMax is a backoff threshold - sessionBackoffMax = 2 * time.Minute - // sessionBackoffNumTries is the maximum number of backoff tries - sessionBackoffNumTries = 5 -) - // session is the utility struct used for session ingestion type session struct { // ID current ID ID string // Index current event index Index int64 + // UploadTime is the time at which the recording was uploaded. + UploadTime time.Time } // SessionEventsJob incapsulates session events consumption logic @@ -51,14 +43,14 @@ type SessionEventsJob struct { lib.ServiceJob app *App sessions chan session - semaphore *semaphore.Weighted + semaphore chan struct{} } // NewSessionEventsJob creates new EventsJob structure func NewSessionEventsJob(app *App) *SessionEventsJob { j := &SessionEventsJob{ app: app, - semaphore: semaphore.NewWeighted(int64(app.Config.Concurrency)), + semaphore: make(chan struct{}, app.Config.Concurrency), sessions: make(chan session), } @@ -92,50 +84,22 @@ func (j *SessionEventsJob) run(ctx context.Context) error { logger.Info("Starting session ingest") - if err := j.semaphore.Acquire(ctx, 1); err != nil { - logger.WithError(err).Error("Failed to acquire semaphore") - continue + select { + case j.semaphore <- struct{}{}: + case <-ctx.Done(): + logger.WithError(ctx.Err()).Error("Failed to acquire semaphore") + return nil } func(s session, log logrus.FieldLogger) { j.app.SpawnCritical(func(ctx context.Context) error { - defer j.semaphore.Release(1) - - backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock()) - backoffCount := sessionBackoffNumTries - - for { - retry, err := j.consumeSession(ctx, s) - - // If sessions needs to retry - if err != nil && retry { - log.WithError(err).WithField("n", backoffCount).Error("Session ingestion error, retrying") - - // Sleep for required interval - err := backoff.Do(ctx) - if err != nil { - return trace.Wrap(err) - } - - // Check if there are number of tries left - backoffCount-- - if backoffCount < 0 { - log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting") - return nil - } - continue - } - - if err != nil { - if !lib.IsCanceled(err) { - log.WithField("err", err).Error("Session ingestion failed") - } - return err - } - - // No errors, we've finished with this session - return nil + defer func() { <-j.semaphore }() + + if err := j.processSession(ctx, s, 0); err != nil { + return trace.Wrap(err) } + + return nil }) }(s, logger) case <-ctx.Done(): @@ -147,6 +111,125 @@ func (j *SessionEventsJob) run(ctx context.Context) error { } } +func (j *SessionEventsJob) processSession(ctx context.Context, s session, processingAttempt int) error { + const ( + // maxNumberOfProcessingAttempts is the number of times a non-existent + // session recording will be processed before assuming the recording + // is lost forever. + maxNumberOfProcessingAttempts = 2 + // eventTimeCutOff is the point of time in the past at which a missing + // session recording is assumed to be lost forever. + eventTimeCutOff = -48 * time.Hour + // sessionBackoffBase is an initial (minimum) backoff value. + sessionBackoffBase = 3 * time.Second + // sessionBackoffMax is a backoff threshold + sessionBackoffMax = time.Minute + // sessionBackoffNumTries is the maximum number of backoff tries + sessionBackoffNumTries = 3 + ) + + log := logger.Get(ctx).WithField("id", s.ID).WithField("index", s.Index) + backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock()) + attempt := sessionBackoffNumTries + + for { + retry, err := j.consumeSession(ctx, s) + switch { + case trace.IsNotFound(err): + // If the session was not found, and it was from more + // than the [eventTimeCutOff], abort processing it any further + // as the recording is likely never going to exist. + if (!s.UploadTime.IsZero() && s.UploadTime.Before(time.Now().Add(eventTimeCutOff))) || + processingAttempt > maxNumberOfProcessingAttempts { + // Remove any trace of the session so that it is not + // processed again in the future and doesn't stick around + // on disk forever. + return trace.NewAggregate(j.app.State.RemoveMissingRecording(s.ID), j.app.State.RemoveSession(s.ID)) + } + + // Otherwise, mark that the session was failed to be processed + // so that it can be tried again in the background. + return trace.NewAggregate(j.app.State.SetMissingRecording(s, processingAttempt+1), j.app.State.RemoveSession(s.ID)) + case err != nil && retry: + // If the number of retries has been exceeded, then + // abort processing the session any further. + attempt-- + if attempt <= 0 { + log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting") + return trace.LimitExceeded("Session ingestion exceeded attempt limit") + } + + log.WithError(err).WithField("n", attempt).Error("Session ingestion error, retrying") + + // Perform backoff before retrying the session again. + if err := backoff.Do(ctx); err != nil { + return trace.Wrap(err) + } + case err != nil: + // Abort on any errors that don't require a retry. + if !lib.IsCanceled(err) { + log.WithField("err", err).Error("Session ingestion failed") + } + return trace.Wrap(err) + default: + // No errors, we've finished processing the session. + return nil + } + } +} + +// processMissingRecordings periodically attempts to reconcile events +// from session recordings that were previously not found. +func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error { + const ( + initialProcessingDelay = time.Minute + processingInterval = 3 * time.Minute + maxNumberOfInflightSessions = 10 + ) + + ctx, cancel := context.WithCancel(ctx) + j.app.Process.OnTerminate(func(_ context.Context) error { + cancel() + return nil + }) + + jitter := retryutils.NewSeventhJitter() + timer := time.NewTimer(jitter(initialProcessingDelay)) + defer timer.Stop() + + semaphore := make(chan struct{}, maxNumberOfInflightSessions) + for { + select { + case <-timer.C: + case <-ctx.Done(): + return nil + } + + err := j.app.State.IterateMissingRecordings(func(sess session, attempts int) error { + select { + case semaphore <- struct{}{}: + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + + go func() { + defer func() { <-semaphore }() + + if err := j.processSession(ctx, sess, attempts); err != nil { + logger.Get(ctx).WithError(err).Debug("Failed processing session recording") + } + }() + + return nil + }) + if err != nil && !lib.IsCanceled(err) { + logger.Get(ctx).WithError(err).Warn("Unable to load previously failed sessions for processing") + } + + timer.Reset(jitter(processingInterval)) + } +} + // restartPausedSessions restarts sessions saved in state func (j *SessionEventsJob) restartPausedSessions() error { sessions, err := j.app.State.GetSessions() @@ -184,11 +267,7 @@ func (j *SessionEventsJob) restartPausedSessions() error { // consumeSession ingests session func (j *SessionEventsJob) consumeSession(ctx context.Context, s session) (bool, error) { - log := logger.Get(ctx) - url := j.app.Config.FluentdSessionURL + "." + s.ID + ".log" - - log.WithField("id", s.ID).WithField("index", s.Index).Info("Started session events ingest") chEvt, chErr := j.app.EventWatcher.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index) Loop: @@ -199,7 +278,7 @@ Loop: case evt, ok := <-chEvt: if !ok { - log.WithField("id", s.ID).Info("Finished session events ingest") + logrus.WithField("id", s.ID).Info("Finished session events ingest") break Loop // Break the main loop } @@ -236,12 +315,7 @@ Loop: // We have finished ingestion and do not need session state anymore err := j.app.State.RemoveSession(s.ID) - // If the session had no events, the file won't exist, so we ignore the error - if err != nil && !os.IsNotExist(err) { - return false, trace.Wrap(err) - } - - return false, nil + return false, trace.Wrap(err) } // RegisterSession starts session event ingestion @@ -251,7 +325,7 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent return trace.Wrap(err) } - s := session{ID: e.SessionID, Index: 0} + s := session{ID: e.SessionID, Index: 0, UploadTime: e.Time} go func() { select { diff --git a/integrations/event-handler/state.go b/integrations/event-handler/state.go index bc5721e1c266c..acb98b95ff03f 100644 --- a/integrations/event-handler/state.go +++ b/integrations/event-handler/state.go @@ -18,14 +18,19 @@ package main import ( "encoding/binary" + "encoding/json" + "errors" + "io/fs" "net" "os" "path" "strings" + "syscall" "time" "github.com/gravitational/trace" "github.com/peterbourgon/diskv/v3" + "github.com/sirupsen/logrus" "github.com/gravitational/teleport/integrations/event-handler/lib" "github.com/gravitational/teleport/integrations/lib/logger" @@ -50,6 +55,9 @@ const ( // sessionPrefix is the session key prefix sessionPrefix = "session" + // missingRecordingPrefix is the missing recording key prefix + missingRecordingPrefix = "missing_recording" + // storageDirPerms is storage directory permissions when created storageDirPerms = 0755 ) @@ -237,7 +245,100 @@ func (s *State) SetSessionIndex(id string, index int64) error { return s.dv.Write(sessionPrefix+id, b) } -// RemoveSession removes session from the state +// SetMissingRecording writes the session with missing recording into state. +func (s *State) SetMissingRecording(sess session, attempt int) error { + b, err := json.Marshal(missingRecording{ + Index: sess.Index, + Attempt: attempt, + Timestamp: sess.UploadTime, + }) + if err != nil { + return trace.Wrap(err) + } + + if err := s.dv.Write(missingRecordingPrefix+sess.ID, b); err != nil { + return trace.Wrap(err) + } + + return trace.Wrap(s.RemoveSession(sess.ID)) +} + +type missingRecording struct { + ID string `json:"id,omitempty"` + Index int64 + Attempt int + Timestamp time.Time +} + +// IterateMissingRecordings finds any sessions with a missing recording and +// provides them to the callback for processing. +func (s *State) IterateMissingRecordings(callback func(s session, attempts int) error) error { + closeC := make(chan struct{}) + defer close(closeC) + for key := range s.dv.KeysPrefix(missingRecordingPrefix, closeC) { + b, err := s.dv.Read(key) + if err != nil { + // Ignore any errors caused by the file being removed + // by an external entity. + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + continue + } + + return trace.Wrap(err) + } + + var m missingRecording + if err := json.Unmarshal(b, &m); err != nil { + logrus.WithError(err).Warnf("Failed to unmarshal missing recording %s from persisted state", key) + continue + } + + s := session{ + ID: key[len(missingRecordingPrefix):], + Index: m.Index, + UploadTime: m.Timestamp, + } + + if err := callback(s, m.Attempt); err != nil { + return trace.Wrap(err) + } + } + + return nil +} + +// RemoveMissingRecording removes the session with a missing recording from state. +func (s *State) RemoveMissingRecording(id string) error { + err := s.dv.Erase(missingRecordingPrefix + id) + if err == nil { + return nil + } + + // If the session had no events, the file won't exist, so we ignore the error + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + return nil + } + + return trace.Wrap(err) +} + +// RemoveSession removes session from the state. func (s *State) RemoveSession(id string) error { - return s.dv.Erase(sessionPrefix + id) + err := s.dv.Erase(sessionPrefix + id) + if err == nil { + return nil + } + + // If the session had no events, the file won't exist, so we ignore the error + var pathError *fs.PathError + if !errors.Is(err, fs.ErrNotExist) || + errors.As(err, &pathError) && errors.Is(pathError.Err, syscall.ENOENT) { + return nil + } + + return trace.Wrap(err) } diff --git a/integrations/event-handler/state_test.go b/integrations/event-handler/state_test.go index 828f38ff09f76..11ee192b95051 100644 --- a/integrations/event-handler/state_test.go +++ b/integrations/event-handler/state_test.go @@ -17,6 +17,9 @@ limitations under the License. package main import ( + "cmp" + "slices" + "strconv" "testing" "time" @@ -88,3 +91,60 @@ func TestStatePersist(t *testing.T) { assert.Equal(t, "testCursor", cursor) assert.Equal(t, "testId", id) } + +func TestStateMissingRecordings(t *testing.T) { + config := newStartCmdConfig(t) + state, err := NewState(config) + require.NoError(t, err) + + // Iterating should find no records if nothing has been stored yet. + err = state.IterateMissingRecordings(func(s session, attempts int) error { + t.Errorf("no missing sessions should have been found, got %v", s) + return nil + }) + require.NoError(t, err) + + // Removing a missing item should not produce an error. + err = state.RemoveMissingRecording("llama") + require.NoError(t, err) + + expected := make([]session, 0, 10) + for i := 0; i < 10; i++ { + s := session{ + ID: strconv.Itoa(i), + Index: int64(i), + UploadTime: time.Now().UTC(), + } + err := state.SetMissingRecording(s, 0) + expected = append(expected, s) + assert.NoError(t, err) + } + + // Iterating find all stored records and validate they match + // the original sessions. + records := make([]session, 0, 10) + err = state.IterateMissingRecordings(func(s session, attempts int) error { + records = append(records, s) + return nil + }) + require.NoError(t, err) + + slices.SortFunc(records, func(a, b session) int { + return cmp.Compare(a.Index, b.Index) + }) + + require.EqualValues(t, expected, records) + + // Remove all items and validate they no longer exist. + for i := 0; i < 10; i++ { + err = state.RemoveMissingRecording(expected[i].ID) + require.NoError(t, err) + } + + err = state.IterateMissingRecordings(func(s session, attempts int) error { + t.Errorf("no missing sessions should have been found, got %v", s) + return nil + }) + require.NoError(t, err) + +}