diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index b225439775fb2..4ef4513af27f2 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -378,7 +378,7 @@ func (u *upload) removeFiles() error { return trace.NewAggregate(errs...) } -func (u *Uploader) startUpload(ctx context.Context, fileName string) error { +func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error) { sessionID, err := sessionIDFromPath(fileName) if err != nil { return trace.Wrap(err) @@ -415,6 +415,20 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error { } } + start := time.Now() + if err := u.takeSemaphore(ctx); err != nil { + return trace.Wrap(err) + } + defer func() { + if err != nil { + _ = u.releaseSemaphore(ctx) + } + }() + + if time.Since(start) > 500*time.Millisecond { + log.Debugf("Semaphore acquired in %v for upload %v.", time.Since(start), fileName) + } + // Apparently, exclusive lock can be obtained only in RDWR mode on NFS sessionFile, err := os.OpenFile(sessionFilePath, os.O_RDWR, 0) if err != nil { @@ -425,7 +439,7 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error { if e := sessionFile.Close(); e != nil { log.WithError(e).Warningf("Failed to close %v.", fileName) } - return trace.WrapWithMessage(err, "could not acquire file lock for %q", sessionFilePath) + return trace.Wrap(err, "uploader could not acquire file lock for %q", sessionFilePath) } upload := &upload{ @@ -442,16 +456,6 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error { return trace.ConvertSystemError(err) } - start := time.Now() - if err := u.takeSemaphore(ctx); err != nil { - if err := upload.Close(); err != nil { - log.WithError(err).Warningf("Failed to close upload.") - } - return trace.Wrap(err) - } - if time.Since(start) > 500*time.Millisecond { - log.Debugf("Semaphore acquired in %v for upload %v.", time.Since(start), fileName) - } u.wg.Add(1) go func() { defer u.wg.Done() diff --git a/lib/events/filesessions/fileasync_chaos_test.go b/lib/events/filesessions/fileasync_chaos_test.go index 31c58d96872a2..e8f6877271937 100644 --- a/lib/events/filesessions/fileasync_chaos_test.go +++ b/lib/events/filesessions/fileasync_chaos_test.go @@ -111,11 +111,10 @@ func TestChaosUpload(t *testing.T) { }) require.NoError(t, err) - scanPeriod := 3 * time.Second uploader, err := NewUploader(UploaderConfig{ ScanDir: scanDir, CorruptedDir: corruptedDir, - ScanPeriod: scanPeriod, + ScanPeriod: 3 * time.Second, Streamer: faultyStreamer, Clock: clockwork.NewRealClock(), }) diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index 27bd9449c508b..54b6b37519917 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -18,6 +18,7 @@ package filesessions import ( "context" + "errors" "fmt" "io" "os" @@ -26,6 +27,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/google/uuid" "github.com/gravitational/trace" @@ -147,9 +149,43 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload return trace.ConvertSystemError(err) } unlock, err := utils.FSTryWriteLock(uploadPath) - if err != nil { - return trace.WrapWithMessage(err, "could not acquire file lock for %q", uploadPath) +Loop: + for i := 0; i < 3; i++ { + switch { + case err == nil: + break Loop + case errors.Is(err, utils.ErrUnsuccessfulLockTry): + // If unable to lock the file, try again with some backoff + // to allow the UploadCompleter to finish and remove its + // file lock before giving up. + select { + case <-ctx.Done(): + if err := f.Close(); err != nil { + h.WithError(err).Errorf("Failed to close file %q.", uploadPath) + } + + return nil + case <-time.After(50 * time.Millisecond): + unlock, err = utils.FSTryWriteLock(uploadPath) + continue + } + default: + if err := f.Close(); err != nil { + h.WithError(err).Errorf("Failed to close file %q.", uploadPath) + } + + return trace.Wrap(err, "handler could not acquire file lock for %q", uploadPath) + } } + + if unlock == nil { + if err := f.Close(); err != nil { + h.WithError(err).Errorf("Failed to close file %q.", uploadPath) + } + + return trace.Wrap(err, "handler could not acquire file lock for %q", uploadPath) + } + defer func() { if err := unlock(); err != nil { h.WithError(err).Errorf("Failed to unlock filesystem lock.")