Skip to content

Commit

Permalink
interrupt requests after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 13, 2024
1 parent 5dbad4f commit 4d8f346
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
5 changes: 5 additions & 0 deletions fuse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
// [2] https://sylabs.io/guides/3.7/user-guide/bind_paths_and_mounts.html#fuse-mounts
package fuse

import "time"

// Types for users to implement.

// The result of Read is an array of bytes, but for performance
Expand Down Expand Up @@ -266,6 +268,9 @@ type MountOptions struct {

// don't alloc buffer for read operation
NoAllocForRead bool

// max duration for a request
Timeout time.Duration
}

// RawFileSystem is an interface close to the FUSE wire protocol.
Expand Down
49 changes: 42 additions & 7 deletions fuse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Server struct {

// maxReaders is the maximum number of goroutines reading requests
maxReaders int
maxUnique uint64

// Pools for []byte
buffers bufferPool
Expand Down Expand Up @@ -122,9 +123,9 @@ func (ms *Server) RecordLatencies(l LatencyMap) {
// Does not work when we were mounted with the magic /dev/fd/N mountpoint syntax,
// as we do not know the real mountpoint. Unmount using
//
// fusermount -u /path/to/real/mountpoint
// fusermount -u /path/to/real/mountpoint
//
/// in this case.
// / in this case.
func (ms *Server) Unmount() (err error) {
if ms.mountPoint == "" {
return nil
Expand Down Expand Up @@ -396,9 +397,7 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
}

req = ms.reqPool.Get().(*request)
if ms.latencies != nil {
req.startTime = time.Now()
}
req.startTime = time.Now()
gobbled := req.setInput(dest[:n])
if !gobbled {
ms.readPool.Put(dest)
Expand All @@ -414,6 +413,9 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
if ms.recentUnique != nil {
ms.recentUnique = append(ms.recentUnique, req.inHeader.Unique)
}
if req.inHeader.Unique > ms.maxUnique {
ms.maxUnique = req.inHeader.Unique
}
req.inflightIndex = len(ms.reqInflight)
ms.reqInflight = append(ms.reqInflight, req)

Expand All @@ -436,13 +438,14 @@ func (ms *Server) checkLostRequests() {
var recentUnique []uint64
time.Sleep(time.Second * 3)
for {
if len(ms.recentUnique) > 10 {
ms.reqMu.Lock()
ms.reqMu.Lock()
if len(ms.recentUnique) >= 30 {
recentUnique = ms.recentUnique
ms.recentUnique = nil
ms.reqMu.Unlock()
break
}
ms.reqMu.Unlock()
time.Sleep(time.Second)
}

Expand Down Expand Up @@ -537,6 +540,9 @@ func (ms *Server) recordStats(req *request) {
// Each filesystem operation executes in a separate goroutine.
func (ms *Server) Serve() {
ms.loop(false)
if ms.opts.Timeout > 0 {
go ms.checkHangRequests(ms.opts.Timeout)
}
ms.loops.Wait()

// shutdown in-flight cache retrieves.
Expand Down Expand Up @@ -574,6 +580,35 @@ func (ms *Server) wakeupReader() {
_ = cmd.Run()
}

func (ms *Server) checkHangRequests(timeout time.Duration) {
for {
time.Sleep(timeout / 100)
var batch = 100
for i := 0; ; i += batch {
now := time.Now()
ms.reqMu.Lock()
if ms.shutdown {
ms.reqMu.Unlock()
break
}
for j := 0; j < batch && i+j < len(ms.reqInflight); j++ {
req := ms.reqInflight[i+j]
if req.interrupted {
unique := req.inHeader.Unique
ms.reqMu.Unlock()
ms.returnInterrupted(unique)
ms.reqMu.Lock()
} else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+2e6 > ms.maxUnique {
log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader)
req.interrupted = true
close(req.cancel)
}
}
ms.reqMu.Unlock()
}
}
}

func (ms *Server) Shutdown() bool {
log.Printf("try to restart gracefully")
start := time.Now()
Expand Down

0 comments on commit 4d8f346

Please sign in to comment.