From 8ee3b79d696842a6c6bd8438028a4634ee2d1e35 Mon Sep 17 00:00:00 2001 From: Sander Bruens Date: Wed, 18 Sep 2024 15:48:34 -0400 Subject: [PATCH] fix: remove deadlock in `multiPacketListener` (#211) * fix: remove deadlock in `multiPacketListener`. * fix test error * Move reading from the `PacketConn` outside the `select`. * Revert "fix test error" This reverts commit 552a1e60a44470207ffed7d86edb2e59032b5be8. * Do not set `m.pc` to nil. * Move buffer down. * Undo vpc creation. * Add mutex back to protect against concurrent `Close()` calls. * Move return when connection is closed down. * Do not close the `readCh` or exit on a closed PacketConn. --- service/listeners.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/service/listeners.go b/service/listeners.go index 55d6c7ef..94f50402 100644 --- a/service/listeners.go +++ b/service/listeners.go @@ -128,42 +128,40 @@ type readRequest struct { type virtualPacketConn struct { net.PacketConn - mu sync.Mutex // Mutex to protect access to the channels - readCh chan readRequest + readCh chan readRequest + + mu sync.Mutex // Mutex to protect against race conditions when closing the connection. + closeCh chan struct{} onCloseFunc OnCloseFunc } -func (pc *virtualPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { +func (pc *virtualPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { respCh := make(chan struct { n int addr net.Addr err error }, 1) - pc.mu.Lock() - if pc.readCh == nil { - pc.mu.Unlock() - return 0, nil, net.ErrClosed - } - pc.readCh <- readRequest{ + select { + case pc.readCh <- readRequest{ buffer: p, respCh: respCh, + }: + case <-pc.closeCh: + return 0, nil, net.ErrClosed } - pc.mu.Unlock() resp := <-respCh return resp.n, resp.addr, resp.err } +// Close closes the virtualPacketConn. It must be called once, and only once, +// per virtualPacketConn. func (pc *virtualPacketConn) Close() error { pc.mu.Lock() defer pc.mu.Unlock() - if pc.readCh == nil { - return nil - } - pc.readCh = nil - + close(pc.closeCh) if pc.onCloseFunc != nil { onCloseFunc := pc.onCloseFunc pc.onCloseFunc = nil @@ -287,10 +285,13 @@ func (m *multiPacketListener) Acquire() (net.PacketConn, error) { m.readCh = make(chan readRequest) m.doneCh = make(chan struct{}) go func() { + buffer := make([]byte, serverUDPBufferSize) for { + n, addr, err := m.pc.ReadFrom(buffer) + buffer = buffer[:n] select { case req := <-m.readCh: - n, addr, err := pc.ReadFrom(req.buffer) + n := copy(req.buffer, buffer) req.respCh <- struct { n int addr net.Addr @@ -307,6 +308,7 @@ func (m *multiPacketListener) Acquire() (net.PacketConn, error) { return &virtualPacketConn{ PacketConn: m.pc, readCh: m.readCh, + closeCh: make(chan struct{}), onCloseFunc: func() error { m.mu.Lock() defer m.mu.Unlock() @@ -314,7 +316,6 @@ func (m *multiPacketListener) Acquire() (net.PacketConn, error) { if m.count == 0 { close(m.doneCh) m.pc.Close() - m.pc = nil if m.onCloseFunc != nil { onCloseFunc := m.onCloseFunc m.onCloseFunc = nil