Skip to content

Commit

Permalink
gbn: add client side resend loop protection
Browse files Browse the repository at this point in the history
This commit ensures that after we have resent the queue, we will wait
until we know that both parties are in sync before we continue to send
new packets. This ensures that we don't end up in an indefinitely resend
loop due to latency and delayed NACKs by the other party, which could
happen prior to this commit.

To understand why we need to await the awaited ACK/NACK after resending the
queue, consider the following scenario:

1.
Alice sends packets 1, 2, 3 & 4 to Bob.
2.
Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs.
3.
Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for
packets 3 & 4 are delayed and aren't received until Alice resend timeout
has passed, which leads to Alice resending packets 3 & 4. Alice will
after that receive the delayed ACKs for packets 3 & 4, but will consider
that as the ACKs for the resent packets, and not the original packets
which they were actually sent for. If we didn't wait after resending the
queue, Alice would then proceed to send more packets (5 & 6).
4.
When Bob receives the resent packets 3 & 4, Bob will respond with
NACK 5. Due to latency, the packets 5 & 6 that Alice sent in step (3)
above will then be received by Bob, and be processed as the correct
response to the NACK 5. Bob will after that await packet 7.
5.
Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob
is now awaiting packet 7, this send will lead to a NACK 7. But due to
latency, if Alice doesn't wait resending the queue, Alice will proceed
to send new packet(s) before receiving the NACK 7.
6.
This resend loop would continue indefinitely, so we need to ensure that
Alice waits after she has resent the queue, to ensure that she doesn't
proceed to send new packets before she is sure that both parties are in
sync.

To ensure that we are in sync, after we have resent the queue, we will
await that we either:
1. Receive a NACK for the sequence number succeeding the last packet in
the resent queue i.e. in step (3) above, that would be NACK 5.
OR
2. Receive an ACK for the last packet in the resent queue i.e. in step
(3) above, that would be ACK 4. After we receive the expected ACK, we
will then wait for the duration of the resend timeout before continuing.
The reason why we wait for the resend timeout before continuing, is that
the ACKs we are getting after a resend, could be delayed ACKs for the
original packets we sent, and not ACKs for the resent packets. In step
(3) above, the ACKs for packets 3 & 4 that Alice received were delayed
ACKs for the original packets. If Alice would have immediately continued
to send new packets (5 & 6) after receiving the ACK 4, she would have
then received the NACK 5 from Bob which was the actual response to the
resent queue. But as Alice had already continued to send packets 5 & 6
when receiving the NACK 5, the resend queue response to that NACK would
cause the resend loop to continue indefinitely.

When either of the 2 conditions above are met, we will consider both
parties to be in sync, and we can proceed to send new packets.
  • Loading branch information
ViktorTigerstrom committed Nov 17, 2023
1 parent 9db0d53 commit 9a3c621
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 68 deletions.
7 changes: 1 addition & 6 deletions gbn/gbn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ func NewClientConn(ctx context.Context, n uint8, sendFunc sendBytesFunc,
math.MaxUint8)
}

conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n)

// Apply functional options
for _, o := range opts {
o(conn)
}
conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n, opts...)

if err := conn.clientHandshake(); err != nil {
if err := conn.Close(); err != nil {
Expand Down
62 changes: 42 additions & 20 deletions gbn/gbn_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,20 @@ type GoBackNConn struct {

// newGoBackNConn creates a GoBackNConn instance with all the members which
// are common between client and server initialised.
//
//nolint:varnamelen
func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
recvFunc recvBytesFunc, isServer bool, n uint8) *GoBackNConn {
recvFunc recvBytesFunc, isServer bool, n uint8,
opts ...Option) *GoBackNConn {

ctxc, cancel := context.WithCancel(ctx)

return &GoBackNConn{
n: n,
s: n + 1,
gbn := &GoBackNConn{
resendTimeout: defaultResendTimeout,
recvFromStream: recvFunc,
sendToStream: sendFunc,
recvDataChan: make(chan *PacketData, n),
sendDataChan: make(chan *PacketData),
isServer: isServer,
sendQueue: newQueue(n+1, defaultHandshakeTimeout),
handshakeTimeout: defaultHandshakeTimeout,
recvTimeout: DefaultRecvTimeout,
sendTimeout: DefaultSendTimeout,
Expand All @@ -138,6 +137,14 @@ func newGoBackNConn(ctx context.Context, sendFunc sendBytesFunc,
cancel: cancel,
quit: make(chan struct{}),
}

for _, o := range opts {
o(gbn)
}

gbn.setN(n)

return gbn
}

// setN sets the current N to use. This _must_ be set before the handshake is
Expand All @@ -146,7 +153,12 @@ func (g *GoBackNConn) setN(n uint8) {
g.n = n
g.s = n + 1
g.recvDataChan = make(chan *PacketData, n)
g.sendQueue = newQueue(n+1, defaultHandshakeTimeout)
g.sendQueue = newQueue(&queueConfig{
s: g.s,
sendPkt: func(packet *PacketData) error {
return g.sendPacket(g.ctx, packet)
},
})
}

// SetSendTimeout sets the timeout used in the Send function.
Expand Down Expand Up @@ -348,6 +360,8 @@ func (g *GoBackNConn) Close() error {
// initialisation.
g.cancel()

g.sendQueue.stop()

g.wg.Wait()

if g.pingTicker != nil {
Expand Down Expand Up @@ -387,9 +401,17 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
func (g *GoBackNConn) sendPacketsForever() error {
// resendQueue re-sends the current contents of the queue.
resendQueue := func() error {
return g.sendQueue.resend(func(packet *PacketData) error {
return g.sendPacket(g.ctx, packet)
})
err := g.sendQueue.resend(g.resendTimeout)

// After resending the queue, we reset the resend ticker.
// This is so that we don't immediately resend the queue again,
// if the sendQueue.resend call above took a long time to
// execute. That can happen if the function was awaiting the
// expected ACK for a long time, or times out while awaiting the
// catch up.
g.resendTicker.Reset(g.resendTimeout)

return err
}

for {
Expand Down Expand Up @@ -578,7 +600,10 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
}

case *PacketACK:
gotValidACK := g.sendQueue.processACK(m.Seq)
gotValidACK := g.sendQueue.processACK(
m.Seq, g.resendTimeout,
)

if gotValidACK {
g.resendTicker.Reset(g.resendTimeout)

Expand All @@ -597,15 +622,12 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
// sent was dropped, or maybe we sent a duplicate
// message. The NACK message contains the sequence
// number that the receiver was expecting.
inQueue, bumped := g.sendQueue.processNACK(m.Seq)

// If the NACK sequence number is not in our queue
// then we ignore it. We must have received the ACK
// for the sequence number in the meantime.
if !inQueue {
log.Tracef("NACK seq %d is not in the queue. "+
"Ignoring. (isServer=%v)", m.Seq,
g.isServer)
shouldResend, bumped := g.sendQueue.processNACK(m.Seq)

// If we don't need to resend the queue after processing
// the NACK, we can continue without sending the resend
// signal.
if !shouldResend {
continue
}

Expand Down
7 changes: 1 addition & 6 deletions gbn/gbn_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ import (
func NewServerConn(ctx context.Context, sendFunc sendBytesFunc,
recvFunc recvBytesFunc, opts ...Option) (*GoBackNConn, error) {

conn := newGoBackNConn(ctx, sendFunc, recvFunc, true, DefaultN)

// Apply functional options
for _, o := range opts {
o(conn)
}
conn := newGoBackNConn(ctx, sendFunc, recvFunc, true, DefaultN, opts...)

if err := conn.serverHandshake(); err != nil {
if err := conn.Close(); err != nil {
Expand Down
Loading

0 comments on commit 9a3c621

Please sign in to comment.