Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] Prevent deadlocking event-handler on missing recordings #44772

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (a *App) Run(ctx context.Context) error {

a.SpawnCriticalJob(a.eventsJob)
a.SpawnCriticalJob(a.sessionEventsJob)
a.SpawnCritical(a.sessionEventsJob.processMissingRecordings)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
Expand Down Expand Up @@ -144,7 +145,6 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
}

log.WithFields(fields).Debug("Event sent")
log.WithField("event", e).Debug("Event dump")

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion integrations/event-handler/fluentd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) {

// Send sends event to fluentd
func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error {
log.WithField("json", string(b)).Debug("JSON to send")
log.WithField("payload", string(b)).Debug("Sending event to Fluentd")

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integrations/event-handler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/sethvargo/go-limiter v0.7.2
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
google.golang.org/protobuf v1.34.0
)

Expand Down Expand Up @@ -284,6 +283,7 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
202 changes: 138 additions & 64 deletions integrations/event-handler/session_events_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,41 @@ package main

import (
"context"
"os"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"

"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/backoff"
"github.com/gravitational/teleport/integrations/lib/logger"
)

const (
// sessionBackoffBase is an initial (minimum) backoff value.
sessionBackoffBase = 3 * time.Second
// sessionBackoffMax is a backoff threshold
sessionBackoffMax = 2 * time.Minute
// sessionBackoffNumTries is the maximum number of backoff tries
sessionBackoffNumTries = 5
)

// session is the utility struct used for session ingestion
type session struct {
// ID current ID
ID string
// Index current event index
Index int64
// UploadTime is the time at which the recording was uploaded.
UploadTime time.Time
}

// SessionEventsJob incapsulates session events consumption logic
type SessionEventsJob struct {
lib.ServiceJob
app *App
sessions chan session
semaphore *semaphore.Weighted
semaphore chan struct{}
}

// NewSessionEventsJob creates new EventsJob structure
func NewSessionEventsJob(app *App) *SessionEventsJob {
j := &SessionEventsJob{
app: app,
semaphore: semaphore.NewWeighted(int64(app.Config.Concurrency)),
semaphore: make(chan struct{}, app.Config.Concurrency),
sessions: make(chan session),
}

Expand Down Expand Up @@ -92,50 +84,22 @@ func (j *SessionEventsJob) run(ctx context.Context) error {

logger.Info("Starting session ingest")

if err := j.semaphore.Acquire(ctx, 1); err != nil {
logger.WithError(err).Error("Failed to acquire semaphore")
continue
select {
case j.semaphore <- struct{}{}:
case <-ctx.Done():
logger.WithError(ctx.Err()).Error("Failed to acquire semaphore")
return nil
}

func(s session, log logrus.FieldLogger) {
j.app.SpawnCritical(func(ctx context.Context) error {
defer j.semaphore.Release(1)

backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock())
backoffCount := sessionBackoffNumTries

for {
retry, err := j.consumeSession(ctx, s)

// If sessions needs to retry
if err != nil && retry {
log.WithError(err).WithField("n", backoffCount).Error("Session ingestion error, retrying")

// Sleep for required interval
err := backoff.Do(ctx)
if err != nil {
return trace.Wrap(err)
}

// Check if there are number of tries left
backoffCount--
if backoffCount < 0 {
log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting")
return nil
}
continue
}

if err != nil {
if !lib.IsCanceled(err) {
log.WithField("err", err).Error("Session ingestion failed")
}
return err
}

// No errors, we've finished with this session
return nil
defer func() { <-j.semaphore }()

if err := j.processSession(ctx, s, 0); err != nil {
return trace.Wrap(err)
}

return nil
})
}(s, logger)
case <-ctx.Done():
Expand All @@ -147,6 +111,125 @@ func (j *SessionEventsJob) run(ctx context.Context) error {
}
}

func (j *SessionEventsJob) processSession(ctx context.Context, s session, processingAttempt int) error {
const (
// maxNumberOfProcessingAttempts is the number of times a non-existent
// session recording will be processed before assuming the recording
// is lost forever.
maxNumberOfProcessingAttempts = 2
// eventTimeCutOff is the point of time in the past at which a missing
// session recording is assumed to be lost forever.
eventTimeCutOff = -48 * time.Hour
// sessionBackoffBase is an initial (minimum) backoff value.
sessionBackoffBase = 3 * time.Second
// sessionBackoffMax is a backoff threshold
sessionBackoffMax = time.Minute
// sessionBackoffNumTries is the maximum number of backoff tries
sessionBackoffNumTries = 3
)

log := logger.Get(ctx).WithField("id", s.ID).WithField("index", s.Index)
backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock())
attempt := sessionBackoffNumTries

for {
retry, err := j.consumeSession(ctx, s)
switch {
case trace.IsNotFound(err):
// If the session was not found, and it was from more
// than the [eventTimeCutOff], abort processing it any further
// as the recording is likely never going to exist.
if (!s.UploadTime.IsZero() && s.UploadTime.Before(time.Now().Add(eventTimeCutOff))) ||
processingAttempt > maxNumberOfProcessingAttempts {
// Remove any trace of the session so that it is not
// processed again in the future and doesn't stick around
// on disk forever.
return trace.NewAggregate(j.app.State.RemoveMissingRecording(s.ID), j.app.State.RemoveSession(s.ID))
}

// Otherwise, mark that the session was failed to be processed
// so that it can be tried again in the background.
return trace.NewAggregate(j.app.State.SetMissingRecording(s, processingAttempt+1), j.app.State.RemoveSession(s.ID))
case err != nil && retry:
// If the number of retries has been exceeded, then
// abort processing the session any further.
attempt--
if attempt <= 0 {
log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting")
return trace.LimitExceeded("Session ingestion exceeded attempt limit")
}

log.WithError(err).WithField("n", attempt).Error("Session ingestion error, retrying")

// Perform backoff before retrying the session again.
if err := backoff.Do(ctx); err != nil {
return trace.Wrap(err)
}
case err != nil:
// Abort on any errors that don't require a retry.
if !lib.IsCanceled(err) {
log.WithField("err", err).Error("Session ingestion failed")
}
return trace.Wrap(err)
default:
// No errors, we've finished processing the session.
return nil
}
}
}

// processMissingRecordings periodically attempts to reconcile events
// from session recordings that were previously not found.
func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error {
const (
initialProcessingDelay = time.Minute
processingInterval = 3 * time.Minute
maxNumberOfInflightSessions = 10
)

ctx, cancel := context.WithCancel(ctx)
j.app.Process.OnTerminate(func(_ context.Context) error {
cancel()
return nil
})

jitter := retryutils.NewSeventhJitter()
timer := time.NewTimer(jitter(initialProcessingDelay))
defer timer.Stop()

semaphore := make(chan struct{}, maxNumberOfInflightSessions)
for {
select {
case <-timer.C:
case <-ctx.Done():
return nil
}

err := j.app.State.IterateMissingRecordings(func(sess session, attempts int) error {
select {
case semaphore <- struct{}{}:
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}

go func() {
defer func() { <-semaphore }()

if err := j.processSession(ctx, sess, attempts); err != nil {
logger.Get(ctx).WithError(err).Debug("Failed processing session recording")
}
}()

return nil
})
if err != nil && !lib.IsCanceled(err) {
logger.Get(ctx).WithError(err).Warn("Unable to load previously failed sessions for processing")
}

timer.Reset(jitter(processingInterval))
}
}

// restartPausedSessions restarts sessions saved in state
func (j *SessionEventsJob) restartPausedSessions() error {
sessions, err := j.app.State.GetSessions()
Expand Down Expand Up @@ -184,11 +267,7 @@ func (j *SessionEventsJob) restartPausedSessions() error {

// consumeSession ingests session
func (j *SessionEventsJob) consumeSession(ctx context.Context, s session) (bool, error) {
log := logger.Get(ctx)

url := j.app.Config.FluentdSessionURL + "." + s.ID + ".log"

log.WithField("id", s.ID).WithField("index", s.Index).Info("Started session events ingest")
chEvt, chErr := j.app.EventWatcher.StreamUnstructuredSessionEvents(ctx, s.ID, s.Index)

Loop:
Expand All @@ -199,7 +278,7 @@ Loop:

case evt, ok := <-chEvt:
if !ok {
log.WithField("id", s.ID).Info("Finished session events ingest")
logrus.WithField("id", s.ID).Info("Finished session events ingest")
break Loop // Break the main loop
}

Expand Down Expand Up @@ -236,12 +315,7 @@ Loop:

// We have finished ingestion and do not need session state anymore
err := j.app.State.RemoveSession(s.ID)
// If the session had no events, the file won't exist, so we ignore the error
if err != nil && !os.IsNotExist(err) {
return false, trace.Wrap(err)
}

return false, nil
return false, trace.Wrap(err)
}

// RegisterSession starts session event ingestion
Expand All @@ -251,7 +325,7 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent
return trace.Wrap(err)
}

s := session{ID: e.SessionID, Index: 0}
s := session{ID: e.SessionID, Index: 0, UploadTime: e.Time}

go func() {
select {
Expand Down
Loading
Loading