Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] auth: require a semaphore in order to complete uploads #41103

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/types/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const SemaphoreKindHostUserModification = "host_user_modification"
// the Access Monitoring feature during handling user queries.
const SemaphoreKindAccessMonitoringLimiter = "access_monitoring_limiter"

// SemaphoreKindUploadCompleter is the semaphore kind used by the
// auth server's upload completer to protect access to the shared
// session recordings backend.
const SemaphoreKindUploadCompleter = "upload_completer"

// Semaphore represents distributed semaphore concept
type Semaphore interface {
// Resource contains common resource values
Expand Down
58 changes: 52 additions & 6 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package events
import (
"context"
"fmt"
"os"
"time"

"github.com/google/uuid"
Expand All @@ -30,6 +31,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/api/utils/retryutils"
Expand All @@ -48,6 +50,11 @@ type UploadCompleterConfig struct {
// SessionTracker is used to discover the current state of a
// sesssions with active uploads.
SessionTracker services.SessionTrackerService
// Semaphores is used to optionally acquire a semaphore prior to completing
// uploads. When specified, ServerID must also be provided.
Semaphores types.Semaphores
// ServerID identifies the server running the upload completer.
ServerID string
// Component is a component used in logging
Component string
// CheckPeriod is a period for checking the upload
Expand All @@ -69,6 +76,9 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
if cfg.ClusterName == "" {
return trace.BadParameter("missing parameter ClusterName")
}
if cfg.Semaphores != nil && cfg.ServerID == "" {
return trace.BadParameter("a server ID must be specified in order to use semaphores")
}
if cfg.Component == "" {
cfg.Component = teleport.ComponentProcess
}
Expand Down Expand Up @@ -134,9 +144,15 @@ func (u *UploadCompleter) Close() {
close(u.closeC)
}

const (
semaphoreName = "upload-completer"
semaphoreMaxLeases = 1 // allow one upload completer to operate at a time
)

// Serve runs the upload completer until closed or until ctx is canceled.
func (u *UploadCompleter) Serve(ctx context.Context) error {
periodic := interval.New(interval.Config{
Clock: u.cfg.Clock,
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Jitter: retryutils.NewSeventhJitter(),
Expand All @@ -147,12 +163,7 @@ func (u *UploadCompleter) Serve(ctx context.Context) error {
for {
select {
case <-periodic.Next():
if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) {
u.log.Warn("Teleport does not have permission to list uploads. " +
"The upload completer will be unable to complete uploads of partial session recordings.")
} else if err != nil {
u.log.WithError(err).Warn("Failed to check uploads.")
}
u.PerformPeriodicCheck(ctx)
case <-u.closeC:
return nil
case <-ctx.Done():
Expand All @@ -161,6 +172,41 @@ func (u *UploadCompleter) Serve(ctx context.Context) error {
}
}

func (u *UploadCompleter) PerformPeriodicCheck(ctx context.Context) {
// If configured with a server ID, then acquire a semaphore prior to completing uploads.
// This is used for auth's upload completer and ensures that multiple auth servers do not
// attempt to complete the same uploads at the same time.
// TODO(zmb3): remove the env var check once the semaphore is proven to be reliable
if u.cfg.Semaphores != nil && os.Getenv("TELEPORT_DISABLE_UPLOAD_COMPLETER_SEMAPHORE") == "" {
u.log.Debugf("%v: acquiring semaphore in order to complete uploads", u.cfg.ServerID)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

_, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{
Service: u.cfg.Semaphores,
Clock: u.cfg.Clock,
Expiry: (u.cfg.CheckPeriod / 2) + 1,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.SemaphoreKindUploadCompleter,
SemaphoreName: semaphoreName,
MaxLeases: semaphoreMaxLeases,
Holder: u.cfg.ServerID,
},
})
if err != nil {
u.log.Debugf("%v did not acquire semaphore, will skip this round of uploads", u.cfg.ServerID)
return
}
}
if err := u.CheckUploads(ctx); trace.IsAccessDenied(err) {
u.log.Warn("Teleport does not have permission to list uploads. " +
"The upload completer will be unable to complete uploads of partial session recordings.")
} else if err != nil {
u.log.WithError(err).Warn("Failed to check uploads.")
}
}

// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
uploads, err := u.cfg.Uploader.ListUploads(ctx)
Expand Down
87 changes: 87 additions & 0 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package events_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -87,6 +88,77 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
require.True(t, mu.IsCompleted(upload.ID))
}

// TestUploadCompleterNeedsSemaphore verifies that the upload completer
// does not complete uploads if it cannot acquire a semaphore.
func TestUploadCompleterNeedsSemaphore(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := eventstest.NewMemoryUploader()
mu.Clock = clock

log := &eventstest.MockAuditLog{}
sessionID := session.NewID()
sessionTrackerService := &mockSessionTrackerService{clock: clock}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
CheckPeriod: 3 * time.Minute,
ServerID: "abc123",
Semaphores: mockSemaphores{
acquireErr: errors.New("semaphore already taken"),
},
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

uc.PerformPeriodicCheck(context.Background())

// upload should not have completed as the semaphore could not be acquired
require.False(t, mu.IsCompleted(upload.ID), "upload %v should not have completed", upload.ID)
}

// TestUploadCompleterAcquiresSemaphore verifies that the upload completer
// successfully completes uploads after acquiring the required semaphore.
func TestUploadCompleterAcquiresSemaphore(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := eventstest.NewMemoryUploader()
mu.Clock = clock

log := &eventstest.MockAuditLog{}
sessionID := session.NewID()
sessionTrackerService := &mockSessionTrackerService{clock: clock}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
CheckPeriod: 3 * time.Minute,
ServerID: "abc123",
Semaphores: mockSemaphores{
lease: &types.SemaphoreLease{
Expires: clock.Now().Add(10 * time.Minute),
},
acquireErr: nil,
},
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

uc.PerformPeriodicCheck(context.Background())

// upload should have completed as semaphore acquisition was successful
require.True(t, mu.IsCompleted(upload.ID), "upload %v should have completed", upload.ID)
}

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
Expand Down Expand Up @@ -268,3 +340,18 @@ func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, se
func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return trace.NotImplemented("UpdatePresence is not implemented")
}

type mockSemaphores struct {
types.Semaphores

lease *types.SemaphoreLease
acquireErr error
}

func (m mockSemaphores) AcquireSemaphore(ctx context.Context, params types.AcquireSemaphoreRequest) (*types.SemaphoreLease, error) {
return m.lease, m.acquireErr
}

func (m mockSemaphores) CancelSemaphoreLease(ctx context.Context, lease types.SemaphoreLease) error {
return nil
}
4 changes: 3 additions & 1 deletion lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,9 +2023,11 @@ func (process *TeleportProcess) initAuthService() error {
err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{
Uploader: uploadHandler,
Component: teleport.ComponentAuth,
ClusterName: clusterName,
AuditLog: process.auditLog,
SessionTracker: authServer.Services,
ClusterName: clusterName,
Semaphores: authServer.Services,
ServerID: cfg.HostUUID,
})
if err != nil {
return trace.Wrap(err)
Expand Down
Loading