From bc1dcea075beddcf42e9a98351e3b2091b111aa8 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Fri, 3 May 2024 15:51:38 +0100 Subject: [PATCH] auth: require a semaphore in order to complete uploads (#41104) Having multiple auth servers attempt to complete uploads against the same shared backend results in racy behavior. This code is focused on cleanup and is not performance sensitive, so we leverage a semaphore to make sure only one auth server is attempting to complete uploads at any point in time. This new behavior can be disabled with an environment variable, which will provide an escape hatch in case we we encounter problems with it. --- api/types/semaphore.go | 5 +++ lib/events/complete.go | 58 ++++++++++++++++++++++--- lib/events/complete_test.go | 87 +++++++++++++++++++++++++++++++++++++ lib/service/service.go | 4 +- 4 files changed, 147 insertions(+), 7 deletions(-) diff --git a/api/types/semaphore.go b/api/types/semaphore.go index 6eaf461e6d0f9..55efc51271829 100644 --- a/api/types/semaphore.go +++ b/api/types/semaphore.go @@ -46,6 +46,11 @@ const SemaphoreKindHostUserModification = "host_user_modification" // the Access Monitoring feature during handling user queries. const SemaphoreKindAccessMonitoringLimiter = "access_monitoring_limiter" +// SemaphoreKindUploadCompleter is the semaphore kind used by the +// auth server's upload completer to protect access to the shared +// session recordings backend. +const SemaphoreKindUploadCompleter = "upload_completer" + // Semaphore represents distributed semaphore concept type Semaphore interface { // Resource contains common resource values diff --git a/lib/events/complete.go b/lib/events/complete.go index 92eb0e5ca97ce..a68d5332a47be 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -19,6 +19,7 @@ package events import ( "context" "fmt" + "os" "time" "github.com/google/uuid" @@ -28,6 +29,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/api/utils/retryutils" @@ -46,6 +48,11 @@ type UploadCompleterConfig struct { // SessionTracker is used to discover the current state of a // sesssions with active uploads. SessionTracker services.SessionTrackerService + // Semaphores is used to optionally acquire a semaphore prior to completing + // uploads. When specified, ServerID must also be provided. + Semaphores types.Semaphores + // ServerID identifies the server running the upload completer. + ServerID string // Component is a component used in logging Component string // CheckPeriod is a period for checking the upload @@ -67,6 +74,9 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error { if cfg.ClusterName == "" { return trace.BadParameter("missing parameter ClusterName") } + if cfg.Semaphores != nil && cfg.ServerID == "" { + return trace.BadParameter("a server ID must be specified in order to use semaphores") + } if cfg.Component == "" { cfg.Component = teleport.ComponentProcess } @@ -132,9 +142,15 @@ func (u *UploadCompleter) Close() { close(u.closeC) } +const ( + semaphoreName = "upload-completer" + semaphoreMaxLeases = 1 // allow one upload completer to operate at a time +) + // Serve runs the upload completer until closed or until ctx is canceled. func (u *UploadCompleter) Serve(ctx context.Context) error { periodic := interval.New(interval.Config{ + Clock: u.cfg.Clock, Duration: u.cfg.CheckPeriod, FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), Jitter: retryutils.NewSeventhJitter(), @@ -145,12 +161,7 @@ func (u *UploadCompleter) Serve(ctx context.Context) error { for { select { case <-periodic.Next(): - if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) { - u.log.Warn("Teleport does not have permission to list uploads. " + - "The upload completer will be unable to complete uploads of partial session recordings.") - } else if err != nil { - u.log.WithError(err).Warn("Failed to check uploads.") - } + u.PerformPeriodicCheck(ctx) case <-u.closeC: return nil case <-ctx.Done(): @@ -159,6 +170,41 @@ func (u *UploadCompleter) Serve(ctx context.Context) error { } } +func (u *UploadCompleter) PerformPeriodicCheck(ctx context.Context) { + // If configured with a server ID, then acquire a semaphore prior to completing uploads. + // This is used for auth's upload completer and ensures that multiple auth servers do not + // attempt to complete the same uploads at the same time. + // TODO(zmb3): remove the env var check once the semaphore is proven to be reliable + if u.cfg.Semaphores != nil && os.Getenv("TELEPORT_DISABLE_UPLOAD_COMPLETER_SEMAPHORE") == "" { + u.log.Debugf("%v: acquiring semaphore in order to complete uploads", u.cfg.ServerID) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + _, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{ + Service: u.cfg.Semaphores, + Clock: u.cfg.Clock, + Expiry: (u.cfg.CheckPeriod / 2) + 1, + Params: types.AcquireSemaphoreRequest{ + SemaphoreKind: types.SemaphoreKindUploadCompleter, + SemaphoreName: semaphoreName, + MaxLeases: semaphoreMaxLeases, + Holder: u.cfg.ServerID, + }, + }) + if err != nil { + u.log.Debugf("%v did not acquire semaphore, will skip this round of uploads", u.cfg.ServerID) + return + } + } + if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) { + u.log.Warn("Teleport does not have permission to list uploads. " + + "The upload completer will be unable to complete uploads of partial session recordings.") + } else if err != nil { + u.log.WithError(err).Warn("Failed to check uploads.") + } +} + // CheckUploads fetches uploads and completes any abandoned uploads func (u *UploadCompleter) CheckUploads(ctx context.Context) error { uploads, err := u.cfg.Uploader.ListUploads(ctx) diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index c9248268c08db..79f4b3fc89b26 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -18,6 +18,7 @@ package events_test import ( "context" + "errors" "fmt" "strings" "testing" @@ -85,6 +86,77 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { require.True(t, mu.IsCompleted(upload.ID)) } +// TestUploadCompleterNeedsSemaphore verifies that the upload completer +// does not complete uploads if it cannot acquire a semaphore. +func TestUploadCompleterNeedsSemaphore(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := eventstest.NewMemoryUploader() + mu.Clock = clock + + log := &eventstest.MockAuditLog{} + sessionID := session.NewID() + sessionTrackerService := &mockSessionTrackerService{clock: clock} + + uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{ + Uploader: mu, + AuditLog: log, + SessionTracker: sessionTrackerService, + Clock: clock, + ClusterName: "teleport-cluster", + CheckPeriod: 3 * time.Minute, + ServerID: "abc123", + Semaphores: mockSemaphores{ + acquireErr: errors.New("semaphore already taken"), + }, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), sessionID) + require.NoError(t, err) + + uc.PerformPeriodicCheck(context.Background()) + + // upload should not have completed as the semaphore could not be acquired + require.False(t, mu.IsCompleted(upload.ID), "upload %v should not have completed", upload.ID) +} + +// TestUploadCompleterAcquiresSemaphore verifies that the upload completer +// successfully completes uploads after acquiring the required semaphore. +func TestUploadCompleterAcquiresSemaphore(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := eventstest.NewMemoryUploader() + mu.Clock = clock + + log := &eventstest.MockAuditLog{} + sessionID := session.NewID() + sessionTrackerService := &mockSessionTrackerService{clock: clock} + + uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{ + Uploader: mu, + AuditLog: log, + SessionTracker: sessionTrackerService, + Clock: clock, + ClusterName: "teleport-cluster", + CheckPeriod: 3 * time.Minute, + ServerID: "abc123", + Semaphores: mockSemaphores{ + lease: &types.SemaphoreLease{ + Expires: clock.Now().Add(10 * time.Minute), + }, + acquireErr: nil, + }, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), sessionID) + require.NoError(t, err) + + uc.PerformPeriodicCheck(context.Background()) + + // upload should have completed as semaphore acquisition was successful + require.True(t, mu.IsCompleted(upload.ID), "upload %v should have completed", upload.ID) +} + // TestUploadCompleterEmitsSessionEnd verifies that the upload completer // emits session.end or windows.desktop.session.end events for sessions // that are completed. @@ -266,3 +338,18 @@ func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, se func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error { return trace.NotImplemented("UpdatePresence is not implemented") } + +type mockSemaphores struct { + types.Semaphores + + lease *types.SemaphoreLease + acquireErr error +} + +func (m mockSemaphores) AcquireSemaphore(ctx context.Context, params types.AcquireSemaphoreRequest) (*types.SemaphoreLease, error) { + return m.lease, m.acquireErr +} + +func (m mockSemaphores) CancelSemaphoreLease(ctx context.Context, lease types.SemaphoreLease) error { + return nil +} diff --git a/lib/service/service.go b/lib/service/service.go index dae61998a2ac6..788ee1f6b6a4c 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1980,9 +1980,11 @@ func (process *TeleportProcess) initAuthService() error { err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{ Uploader: uploadHandler, Component: teleport.ComponentAuth, + ClusterName: clusterName, AuditLog: process.auditLog, SessionTracker: authServer.Services, - ClusterName: clusterName, + Semaphores: authServer.Services, + ServerID: cfg.HostUUID, }) if err != nil { return trace.Wrap(err)