diff --git a/api/types/semaphore.go b/api/types/semaphore.go index 6eaf461e6d0f9..55efc51271829 100644 --- a/api/types/semaphore.go +++ b/api/types/semaphore.go @@ -46,6 +46,11 @@ const SemaphoreKindHostUserModification = "host_user_modification" // the Access Monitoring feature during handling user queries. const SemaphoreKindAccessMonitoringLimiter = "access_monitoring_limiter" +// SemaphoreKindUploadCompleter is the semaphore kind used by the +// auth server's upload completer to protect access to the shared +// session recordings backend. +const SemaphoreKindUploadCompleter = "upload_completer" + // Semaphore represents distributed semaphore concept type Semaphore interface { // Resource contains common resource values diff --git a/lib/events/complete.go b/lib/events/complete.go index 92eb0e5ca97ce..a68d5332a47be 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -19,6 +19,7 @@ package events import ( "context" "fmt" + "os" "time" "github.com/google/uuid" @@ -28,6 +29,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/api/utils/retryutils" @@ -46,6 +48,11 @@ type UploadCompleterConfig struct { // SessionTracker is used to discover the current state of a // sesssions with active uploads. SessionTracker services.SessionTrackerService + // Semaphores is used to optionally acquire a semaphore prior to completing + // uploads. When specified, ServerID must also be provided. + Semaphores types.Semaphores + // ServerID identifies the server running the upload completer. + ServerID string // Component is a component used in logging Component string // CheckPeriod is a period for checking the upload @@ -67,6 +74,9 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error { if cfg.ClusterName == "" { return trace.BadParameter("missing parameter ClusterName") } + if cfg.Semaphores != nil && cfg.ServerID == "" { + return trace.BadParameter("a server ID must be specified in order to use semaphores") + } if cfg.Component == "" { cfg.Component = teleport.ComponentProcess } @@ -132,9 +142,15 @@ func (u *UploadCompleter) Close() { close(u.closeC) } +const ( + semaphoreName = "upload-completer" + semaphoreMaxLeases = 1 // allow one upload completer to operate at a time +) + // Serve runs the upload completer until closed or until ctx is canceled. func (u *UploadCompleter) Serve(ctx context.Context) error { periodic := interval.New(interval.Config{ + Clock: u.cfg.Clock, Duration: u.cfg.CheckPeriod, FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), Jitter: retryutils.NewSeventhJitter(), @@ -145,12 +161,7 @@ func (u *UploadCompleter) Serve(ctx context.Context) error { for { select { case <-periodic.Next(): - if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) { - u.log.Warn("Teleport does not have permission to list uploads. " + - "The upload completer will be unable to complete uploads of partial session recordings.") - } else if err != nil { - u.log.WithError(err).Warn("Failed to check uploads.") - } + u.PerformPeriodicCheck(ctx) case <-u.closeC: return nil case <-ctx.Done(): @@ -159,6 +170,41 @@ func (u *UploadCompleter) Serve(ctx context.Context) error { } } +func (u *UploadCompleter) PerformPeriodicCheck(ctx context.Context) { + // If configured with a server ID, then acquire a semaphore prior to completing uploads. + // This is used for auth's upload completer and ensures that multiple auth servers do not + // attempt to complete the same uploads at the same time. + // TODO(zmb3): remove the env var check once the semaphore is proven to be reliable + if u.cfg.Semaphores != nil && os.Getenv("TELEPORT_DISABLE_UPLOAD_COMPLETER_SEMAPHORE") == "" { + u.log.Debugf("%v: acquiring semaphore in order to complete uploads", u.cfg.ServerID) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + _, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{ + Service: u.cfg.Semaphores, + Clock: u.cfg.Clock, + Expiry: (u.cfg.CheckPeriod / 2) + 1, + Params: types.AcquireSemaphoreRequest{ + SemaphoreKind: types.SemaphoreKindUploadCompleter, + SemaphoreName: semaphoreName, + MaxLeases: semaphoreMaxLeases, + Holder: u.cfg.ServerID, + }, + }) + if err != nil { + u.log.Debugf("%v did not acquire semaphore, will skip this round of uploads", u.cfg.ServerID) + return + } + } + if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) { + u.log.Warn("Teleport does not have permission to list uploads. " + + "The upload completer will be unable to complete uploads of partial session recordings.") + } else if err != nil { + u.log.WithError(err).Warn("Failed to check uploads.") + } +} + // CheckUploads fetches uploads and completes any abandoned uploads func (u *UploadCompleter) CheckUploads(ctx context.Context) error { uploads, err := u.cfg.Uploader.ListUploads(ctx) diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index c9248268c08db..79f4b3fc89b26 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -18,6 +18,7 @@ package events_test import ( "context" + "errors" "fmt" "strings" "testing" @@ -85,6 +86,77 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { require.True(t, mu.IsCompleted(upload.ID)) } +// TestUploadCompleterNeedsSemaphore verifies that the upload completer +// does not complete uploads if it cannot acquire a semaphore. +func TestUploadCompleterNeedsSemaphore(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := eventstest.NewMemoryUploader() + mu.Clock = clock + + log := &eventstest.MockAuditLog{} + sessionID := session.NewID() + sessionTrackerService := &mockSessionTrackerService{clock: clock} + + uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{ + Uploader: mu, + AuditLog: log, + SessionTracker: sessionTrackerService, + Clock: clock, + ClusterName: "teleport-cluster", + CheckPeriod: 3 * time.Minute, + ServerID: "abc123", + Semaphores: mockSemaphores{ + acquireErr: errors.New("semaphore already taken"), + }, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), sessionID) + require.NoError(t, err) + + uc.PerformPeriodicCheck(context.Background()) + + // upload should not have completed as the semaphore could not be acquired + require.False(t, mu.IsCompleted(upload.ID), "upload %v should not have completed", upload.ID) +} + +// TestUploadCompleterAcquiresSemaphore verifies that the upload completer +// successfully completes uploads after acquiring the required semaphore. +func TestUploadCompleterAcquiresSemaphore(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := eventstest.NewMemoryUploader() + mu.Clock = clock + + log := &eventstest.MockAuditLog{} + sessionID := session.NewID() + sessionTrackerService := &mockSessionTrackerService{clock: clock} + + uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{ + Uploader: mu, + AuditLog: log, + SessionTracker: sessionTrackerService, + Clock: clock, + ClusterName: "teleport-cluster", + CheckPeriod: 3 * time.Minute, + ServerID: "abc123", + Semaphores: mockSemaphores{ + lease: &types.SemaphoreLease{ + Expires: clock.Now().Add(10 * time.Minute), + }, + acquireErr: nil, + }, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), sessionID) + require.NoError(t, err) + + uc.PerformPeriodicCheck(context.Background()) + + // upload should have completed as semaphore acquisition was successful + require.True(t, mu.IsCompleted(upload.ID), "upload %v should have completed", upload.ID) +} + // TestUploadCompleterEmitsSessionEnd verifies that the upload completer // emits session.end or windows.desktop.session.end events for sessions // that are completed. @@ -266,3 +338,18 @@ func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, se func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error { return trace.NotImplemented("UpdatePresence is not implemented") } + +type mockSemaphores struct { + types.Semaphores + + lease *types.SemaphoreLease + acquireErr error +} + +func (m mockSemaphores) AcquireSemaphore(ctx context.Context, params types.AcquireSemaphoreRequest) (*types.SemaphoreLease, error) { + return m.lease, m.acquireErr +} + +func (m mockSemaphores) CancelSemaphoreLease(ctx context.Context, lease types.SemaphoreLease) error { + return nil +} diff --git a/lib/service/service.go b/lib/service/service.go index dae61998a2ac6..788ee1f6b6a4c 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1980,9 +1980,11 @@ func (process *TeleportProcess) initAuthService() error { err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{ Uploader: uploadHandler, Component: teleport.ComponentAuth, + ClusterName: clusterName, AuditLog: process.auditLog, SessionTracker: authServer.Services, - ClusterName: clusterName, + Semaphores: authServer.Services, + ServerID: cfg.HostUUID, }) if err != nil { return trace.Wrap(err)