From d9302435084f9f26d8dc6fbe65a5c13e5525e896 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Fri, 13 Sep 2024 15:55:57 -0700 Subject: [PATCH] mux: exit goroutine when stream closes --- v2/mux.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/v2/mux.go b/v2/mux.go index b775100..e78afb4 100644 --- a/v2/mux.go +++ b/v2/mux.go @@ -256,6 +256,8 @@ func (m *Mux) readLoop() { cond: sync.Cond{L: new(sync.Mutex)}, covert: covert, established: true, + + closed: make(chan struct{}), } m.streams[h.id] = curStream m.cond.Broadcast() // wake (*Mux).AcceptStream @@ -313,6 +315,8 @@ func (m *Mux) DialStream() *Stream { cond: sync.Cond{L: new(sync.Mutex)}, established: false, err: m.err, // stream is unusable if m.err is set + + closed: make(chan struct{}), } m.streams[s.id] = s m.nextID += 2 @@ -340,15 +344,19 @@ func (m *Mux) DialCovertStream() *Stream { // DialStreamContext creates a new Stream with the provided context. When the // context expires, the Stream will be closed and any pending calls will return -// ctx.Err(). DialStreamContext spawns a goroutine whose lifetime matches that -// of the context. +// ctx.Err(). // // Unlike e.g. net.Dial, this does not perform any I/O; the peer will not be // aware of the new Stream until Write is called. func (m *Mux) DialStreamContext(ctx context.Context) *Stream { s := m.DialStream() go func() { - <-ctx.Done() + select { + case <-s.closed: + return + case <-ctx.Done(): + } + s.cond.L.Lock() defer s.cond.L.Unlock() if ctx.Err() != nil && s.err == nil { @@ -424,6 +432,8 @@ type Stream struct { err error readBuf []byte rd, wd time.Time // deadlines + + closed chan struct{} // closed when the Stream is closed } // LocalAddr returns the underlying connection's LocalAddr. @@ -562,6 +572,13 @@ func (s *Stream) Write(p []byte) (int, error) { // Close closes the Stream. The underlying connection is not closed. func (s *Stream) Close() error { + select { + case <-s.closed: + default: + // close the channel to signal the context goroutine to exit + close(s.closed) + } + // cancel outstanding Read/Write calls // // NOTE: Read calls will be interrupted immediately, but Write calls might