diff --git a/fuse/api.go b/fuse/api.go index fbdebfab..39a098f9 100644 --- a/fuse/api.go +++ b/fuse/api.go @@ -129,6 +129,8 @@ // [2] https://sylabs.io/guides/3.7/user-guide/bind_paths_and_mounts.html#fuse-mounts package fuse +import "time" + // Types for users to implement. // The result of Read is an array of bytes, but for performance @@ -266,6 +268,9 @@ type MountOptions struct { // don't alloc buffer for read operation NoAllocForRead bool + + // max duration for a request + Timeout time.Duration } // RawFileSystem is an interface close to the FUSE wire protocol. diff --git a/fuse/server.go b/fuse/server.go index 8003c395..2accb15f 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -52,6 +52,7 @@ type Server struct { // maxReaders is the maximum number of goroutines reading requests maxReaders int + maxUnique uint64 // Pools for []byte buffers bufferPool @@ -122,9 +123,9 @@ func (ms *Server) RecordLatencies(l LatencyMap) { // Does not work when we were mounted with the magic /dev/fd/N mountpoint syntax, // as we do not know the real mountpoint. Unmount using // -// fusermount -u /path/to/real/mountpoint +// fusermount -u /path/to/real/mountpoint // -/// in this case. +// / in this case. func (ms *Server) Unmount() (err error) { if ms.mountPoint == "" { return nil @@ -396,9 +397,7 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { } req = ms.reqPool.Get().(*request) - if ms.latencies != nil { - req.startTime = time.Now() - } + req.startTime = time.Now() gobbled := req.setInput(dest[:n]) if !gobbled { ms.readPool.Put(dest) @@ -414,6 +413,9 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { if ms.recentUnique != nil { ms.recentUnique = append(ms.recentUnique, req.inHeader.Unique) } + if req.inHeader.Unique > ms.maxUnique { + ms.maxUnique = req.inHeader.Unique + } req.inflightIndex = len(ms.reqInflight) ms.reqInflight = append(ms.reqInflight, req) @@ -436,13 +438,14 @@ func (ms *Server) checkLostRequests() { var recentUnique []uint64 time.Sleep(time.Second * 3) for { - if len(ms.recentUnique) > 10 { - ms.reqMu.Lock() + ms.reqMu.Lock() + if len(ms.recentUnique) >= 30 { recentUnique = ms.recentUnique ms.recentUnique = nil ms.reqMu.Unlock() break } + ms.reqMu.Unlock() time.Sleep(time.Second) } @@ -537,6 +540,9 @@ func (ms *Server) recordStats(req *request) { // Each filesystem operation executes in a separate goroutine. func (ms *Server) Serve() { ms.loop(false) + if ms.opts.Timeout > 0 { + go ms.checkHangRequests(ms.opts.Timeout) + } ms.loops.Wait() // shutdown in-flight cache retrieves. @@ -574,6 +580,35 @@ func (ms *Server) wakeupReader() { _ = cmd.Run() } +func (ms *Server) checkHangRequests(timeout time.Duration) { + for { + time.Sleep(timeout / 100) + var batch = 100 + for i := 0; ; i += batch { + now := time.Now() + ms.reqMu.Lock() + if ms.shutdown { + ms.reqMu.Unlock() + break + } + for j := 0; j < batch && i+j < len(ms.reqInflight); j++ { + req := ms.reqInflight[i+j] + if req.interrupted { + unique := req.inHeader.Unique + ms.reqMu.Unlock() + ms.returnInterrupted(unique) + ms.reqMu.Lock() + } else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+2e6 > ms.maxUnique { + log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader) + req.interrupted = true + close(req.cancel) + } + } + ms.reqMu.Unlock() + } + } +} + func (ms *Server) Shutdown() bool { log.Printf("try to restart gracefully") start := time.Now()