From 4460b4f564ac9f622b70fb7e91830fff220ff186 Mon Sep 17 00:00:00 2001 From: lukechampine Date: Wed, 14 Oct 2020 23:40:34 -0400 Subject: [PATCH] ghost: Fix Session race conditions --- internal/ghost/host.go | 2 ++ internal/ghost/session.go | 45 ++++++++++++------------------- renter/renterutil/migrate_test.go | 7 ++++- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/internal/ghost/host.go b/internal/ghost/host.go index 74f9e84..b677d8d 100644 --- a/internal/ghost/host.go +++ b/internal/ghost/host.go @@ -6,6 +6,7 @@ import ( "crypto/ed25519" "log" "net" + "sync" "gitlab.com/NebulousLabs/Sia/crypto" "gitlab.com/NebulousLabs/Sia/modules" @@ -23,6 +24,7 @@ type hostContract struct { renterKey types.SiaPublicKey sectorRoots []crypto.Hash sectorData map[crypto.Hash][renterhost.SectorSize]byte + mu sync.Mutex } type Host struct { diff --git a/internal/ghost/session.go b/internal/ghost/session.go index a595bb7..e5d1907 100644 --- a/internal/ghost/session.go +++ b/internal/ghost/session.go @@ -40,6 +40,12 @@ func (h *Host) handleConn(conn net.Conn) error { sess: hs, conn: conn, } + defer func() { + if s.contract != nil { + s.contract.mu.Unlock() + s.contract = nil + } + }() rpcs := map[renterhost.Specifier]func(*session) error{ renterhost.RPCFormContractID: h.rpcFormContract, @@ -278,6 +284,7 @@ func (h *Host) rpcLock(s *session) error { s.sess.WriteResponse(nil, err) return err } + contract.mu.Lock() s.contract = contract var newChallenge [16]byte @@ -293,6 +300,9 @@ func (h *Host) rpcLock(s *session) error { } func (h *Host) rpcUnlock(s *session) error { + if s.contract != nil { + s.contract.mu.Unlock() + } s.contract = nil return nil } @@ -565,31 +575,20 @@ func (h *Host) rpcRead(s *session) error { return err } - // As soon as we finish reading the request, we must begin listening for - // RPCLoopReadStop, which may arrive at any time, and must arrive before the - // RPC is considered complete. - stopSignal := make(chan error, 1) - go func() { - var id renterhost.Specifier - err := s.sess.ReadResponse(&id, 4096) - if err != nil { - stopSignal <- err - } else if id != renterhost.RPCReadStop { - stopSignal <- errors.New("expected 'stop' from renter, got " + id.String()) - } else { - stopSignal <- nil - } - }() + // Make sure we read RPCLoopReadStop before returning. + // + // NOTE: technically, we should be listening for RPCLoopReadStop + // asynchronously, but currently this is not possible because + // renterhost.Session is not thread-safe. + defer s.sess.ReadResponse(new(renterhost.Specifier), 4096) if s.contract == nil { err := errors.New("no contract locked") s.sess.WriteResponse(nil, err) - <-stopSignal return err } else if s.contract.rev.NewRevisionNumber == math.MaxUint64 { err := errors.New("contract cannot be revised") s.sess.WriteResponse(nil, err) - <-stopSignal return err } @@ -682,15 +681,6 @@ func (h *Host) rpcRead(s *session) error { Data: data, MerkleProof: proof, } - select { - case err := <-stopSignal: - if err != nil { - return err - } - resp.Signature = hostSig - return s.sess.WriteResponse(resp, nil) - default: - } if i == len(req.Sections)-1 { resp.Signature = hostSig } @@ -698,6 +688,5 @@ func (h *Host) rpcRead(s *session) error { return err } } - // The stop signal must arrive before RPC is complete. - return <-stopSignal + return nil } diff --git a/renter/renterutil/migrate_test.go b/renter/renterutil/migrate_test.go index fed46c6..0ffc9d1 100644 --- a/renter/renterutil/migrate_test.go +++ b/renter/renterutil/migrate_test.go @@ -88,11 +88,16 @@ func TestMigrate(t *testing.T) { t.Fatal(err) } + // close all hs1 sessions + if err := hs1.Close(); err != nil { + t.Fatal(err) + } + // create fs2 with hs2 fs2 := NewFileSystem(os.TempDir(), hs2) defer fs2.Close() - // close one of the non-hs2 hosts; this ensures that we'll download from the new host + // remove one of the non-hs2 hosts; this ensures that we'll download from the new host for hostKey, lh := range fs1.hosts.sessions { if fs2.hosts.HasHost(hostKey) { lh.s.Close()