Skip to content

Commit

Permalink
Allow UploadCompleter and FileUploader to coexist (#43412)
Browse files Browse the repository at this point in the history
Closes #33099
  • Loading branch information
rosstimothy authored Jun 24, 2024
1 parent 15a774e commit ac39849
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 16 deletions.
28 changes: 16 additions & 12 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (u *upload) removeFiles() error {
return trace.NewAggregate(errs...)
}

func (u *Uploader) startUpload(ctx context.Context, fileName string) error {
func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error) {
sessionID, err := sessionIDFromPath(fileName)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -417,6 +417,20 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error {
}
}

start := time.Now()
if err := u.takeSemaphore(ctx); err != nil {
return trace.Wrap(err)
}
defer func() {
if err != nil {
_ = u.releaseSemaphore(ctx)
}
}()

if time.Since(start) > 500*time.Millisecond {
log.Debugf("Semaphore acquired in %v for upload %v.", time.Since(start), fileName)
}

// Apparently, exclusive lock can be obtained only in RDWR mode on NFS
sessionFile, err := os.OpenFile(sessionFilePath, os.O_RDWR, 0)
if err != nil {
Expand All @@ -427,7 +441,7 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error {
if e := sessionFile.Close(); e != nil {
log.WithError(e).Warningf("Failed to close %v.", fileName)
}
return trace.WrapWithMessage(err, "could not acquire file lock for %q", sessionFilePath)
return trace.Wrap(err, "uploader could not acquire file lock for %q", sessionFilePath)
}

upload := &upload{
Expand All @@ -444,16 +458,6 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error {
return trace.ConvertSystemError(err)
}

start := time.Now()
if err := u.takeSemaphore(ctx); err != nil {
if err := upload.Close(); err != nil {
log.WithError(err).Warningf("Failed to close upload.")
}
return trace.Wrap(err)
}
if time.Since(start) > 500*time.Millisecond {
log.Debugf("Semaphore acquired in %v for upload %v.", time.Since(start), fileName)
}
u.wg.Add(1)
go func() {
defer u.wg.Done()
Expand Down
3 changes: 1 addition & 2 deletions lib/events/filesessions/fileasync_chaos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ func TestChaosUpload(t *testing.T) {
})
require.NoError(t, err)

scanPeriod := 3 * time.Second
uploader, err := NewUploader(UploaderConfig{
ScanDir: scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: scanPeriod,
ScanPeriod: 3 * time.Second,
Streamer: faultyStreamer,
Clock: clockwork.NewRealClock(),
})
Expand Down
40 changes: 38 additions & 2 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package filesessions

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -28,6 +29,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -149,9 +151,43 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload
return trace.ConvertSystemError(err)
}
unlock, err := utils.FSTryWriteLock(uploadPath)
if err != nil {
return trace.WrapWithMessage(err, "could not acquire file lock for %q", uploadPath)
Loop:
for i := 0; i < 3; i++ {
switch {
case err == nil:
break Loop
case errors.Is(err, utils.ErrUnsuccessfulLockTry):
// If unable to lock the file, try again with some backoff
// to allow the UploadCompleter to finish and remove its
// file lock before giving up.
select {
case <-ctx.Done():
if err := f.Close(); err != nil {
h.WithError(err).Errorf("Failed to close file %q.", uploadPath)
}

return nil
case <-time.After(50 * time.Millisecond):
unlock, err = utils.FSTryWriteLock(uploadPath)
continue
}
default:
if err := f.Close(); err != nil {
h.WithError(err).Errorf("Failed to close file %q.", uploadPath)
}

return trace.Wrap(err, "handler could not acquire file lock for %q", uploadPath)
}
}

if unlock == nil {
if err := f.Close(); err != nil {
h.WithError(err).Errorf("Failed to close file %q.", uploadPath)
}

return trace.Wrap(err, "handler could not acquire file lock for %q", uploadPath)
}

defer func() {
if err := unlock(); err != nil {
h.WithError(err).Errorf("Failed to unlock filesystem lock.")
Expand Down

0 comments on commit ac39849

Please sign in to comment.