Skip to content

Commit

Permalink
gbn: move resending syncing logic into struct
Browse files Browse the repository at this point in the history
This commit moves the logic for awaiting the parties to be synced after
resending the queue into the a separate syncer struct. This is done to
make the resending logic more readable easier to reason about.
  • Loading branch information
ellemouton authored and ViktorTigerstrom committed Dec 13, 2023
1 parent bfe91e0 commit a1ee556
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 269 deletions.
278 changes: 19 additions & 259 deletions gbn/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,6 @@ import (
"github.com/btcsuite/btclog"
)

const (
// awaitingTimeoutMultiplier defines the multiplier we use when
// multiplying the resend timeout during a resend catch up, resulting in
// duration we wait for the resend catch up to complete before timing
// out.
// We set this to 3X the resend timeout. The reason we wait exactly 3X
// the resend timeout is that we expect that the max time correct
// behavior would take, would be:
// * 1X the resendTimeout for the time it would take for the party
// respond with an ACK for the last packet in the resend queue, i.e. the
// awaitedACK.
// * 1X the resendTimeout while waiting in proceedAfterTime before
// sending the awaitedACKSignal.
// * 1X extra resendTimeout as buffer, to ensure that we have enough
// time to process the ACKS/NACKS by other party + some extra margin.
awaitingTimeoutMultiplier = 3
)

type queueCfg struct {
// s is the maximum sequence number used to label packets. Packets
// are labelled with incrementing sequence numbers modulo s.
Expand Down Expand Up @@ -69,41 +51,7 @@ type queue struct {
// topMtx is used to guard sequenceTop.
topMtx sync.RWMutex

// awaitedACK defines the sequence number for the last packet in the
// resend queue. If we receive an ACK for this sequence number during
// the resend catch up, we wait for the duration of the resend timeout,
// and then proceed to send new packets, unless we receive the
// awaitedNACK during the wait time. If that happens, we will proceed
// send new packets as soon as we have processed the NACK.
awaitedACK uint8

// awaitedNACK defines the sequence number that in case we get a NACK
// with that sequence number during the resend catch up, we'd consider
// the catch up to be complete and we can proceed to send new packets.
awaitedNACK uint8

// awaitingCatchUp is set to true if we are awaiting a catch up after we
// have resent the queue.
awaitingCatchUp bool

// awaitingCatchUpMu must be held when accessing or mutating the values
// of awaitedACK, awaitedNACK and awaitingCatchUp.
awaitingCatchUpMu sync.RWMutex

// awaitedACKSignal is used to signal that we have received the awaited
// ACK after resending the queue, and have waited for the duration of
// the resend timeout. Once this signal is received, we can proceed to
// send new packets.
awaitedACKSignal chan struct{}

// awaitedNACKSignal is used to signal that we have received the awaited
// NACK after resending the queue. Once this signal is received, we can
// proceed to send new packets.
awaitedNACKSignal chan struct{}

// caughtUpSignal is used to signal that we have caught up after
// awaiting the catch up after resending the queue.
caughtUpSignal chan struct{}
syncer *syncer

lastResend time.Time

Expand All @@ -116,14 +64,15 @@ func newQueue(cfg *queueCfg) *queue {
cfg.log = log
}

return &queue{
cfg: cfg,
content: make([]*PacketData, cfg.s),
awaitedACKSignal: make(chan struct{}, 1),
awaitedNACKSignal: make(chan struct{}, 1),
caughtUpSignal: make(chan struct{}, 1),
quit: make(chan struct{}),
q := &queue{
cfg: cfg,
content: make([]*PacketData, cfg.s),
quit: make(chan struct{}),
}

q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit)

return q
}

func (q *queue) stop() {
Expand Down Expand Up @@ -155,63 +104,9 @@ func (q *queue) addPacket(packet *PacketData) {
q.sequenceTop = (q.sequenceTop + 1) % q.cfg.s
}

// resend resends the current contents of the queue, by invoking the callback
// for each packet that needs to be resent, and then awaits that we either
// receive the expected ACK or NACK after resending the queue, before returning.
//
// To understand why we need to await the awaited ACK/NACK after resending the
// queue, it ensures that we don't end up in a situation where we resend the
// queue over and over again due to latency and delayed NACKs by the other
// party.
//
// Consider the following scenario:
// 1.
// Alice sends packets 1, 2, 3 & 4 to Bob.
// 2.
// Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs.
// 3.
// Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for
// packets 3 & 4 are delayed and aren't received until Alice resend timeout
// has passed, which leads to Alice resending packets 3 & 4. Alice will after
// that receive the delayed ACKs for packets 3 & 4, but will consider that as
// the ACKs for the resent packets, and not the original packets which they were
// actually sent for. If we didn't wait after resending the queue, Alice would
// then proceed to send more packets (5 & 6).
// 4.
// When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due
// to latency, the packets 5 & 6 that Alice sent in step (3) above will then be
// received by Bob, and be processed as the correct response to the NACK 5. Bob
// will after that await packet 7.
// 5.
// Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is
// now awaiting packet 7, this send will lead to a NACK 7. But due to latency,
// if Alice doesn't wait resending the queue, Alice will proceed to send new
// packet(s) before receiving the NACK 7.
// 6.
// This resend loop would continue indefinitely, so we need to ensure that Alice
// waits after she has resent the queue, to ensure that she doesn't proceed to
// send new packets before she is sure that both parties are in sync.
//
// To ensure that we are in sync, after we have resent the queue, we will await
// that we either:
// 1. Receive a NACK for the sequence number succeeding the last packet in the
// resent queue i.e. in step (3) above, that would be NACK 5.
// OR
// 2. Receive an ACK for the last packet in the resent queue i.e. in step (3)
// above, that would be ACK 4. After we receive the expected ACK, we will then
// wait for the duration of the resend timeout before continuing. The reason why
// we wait for the resend timeout before continuing, is that the ACKs we are
// getting after a resend, could be delayed ACKs for the original packets we
// sent, and not ACKs for the resent packets. In step (3) above, the ACKs for
// packets 3 & 4 that Alice received were delayed ACKs for the original packets.
// If Alice would have immediately continued to send new packets (5 & 6) after
// receiving the ACK 4, she would have then received the NACK 5 from Bob which
// was the actual response to the resent queue. But as Alice had already
// continued to send packets 5 & 6 when receiving the NACK 5, the resend queue
// response to that NACK would cause the resend loop to continue indefinitely.
//
// When either of the 2 conditions above are met, we will consider both parties
// to be in sync, and we can proceed to send new packets.
// resend resends the current contents of the queue. It allows some time for the
// two parties to be seen as synced; this may fail in which case the caller is
// expected to call resend again.
func (q *queue) resend() error {
if time.Since(q.lastResend) < q.cfg.timeout {
q.cfg.log.Tracef("Resent the queue recently.")
Expand All @@ -223,10 +118,6 @@ func (q *queue) resend() error {
return nil
}

q.lastResend = time.Now()

q.awaitingCatchUpMu.Lock()

q.baseMtx.RLock()
base := q.sequenceBase
q.baseMtx.RUnlock()
Expand All @@ -236,35 +127,20 @@ func (q *queue) resend() error {
q.topMtx.RUnlock()

if base == top {
q.awaitingCatchUpMu.Unlock()
q.syncer.reset()

return nil
}

// Prepare the queue for awaiting the resend catch up.
q.awaitedACK = (q.cfg.s + top - 1) % q.cfg.s
q.awaitedNACK = top

q.cfg.log.Tracef("Set awaitedACK to %d & awaitedNACK to %d",
q.awaitedACK, q.awaitedNACK)

q.awaitingCatchUp = true

// Drain the caughtUpSignal channel, in case no proceedAfterTime
// func was executed after the last resend catch up.
select {
case <-q.caughtUpSignal:
default:
}
q.syncer.initResendUpTo(top)

q.cfg.log.Tracef("Resending the queue")

for base != top {
packet := q.content[base]

if err := q.cfg.sendPkt(packet); err != nil {
q.awaitingCatchUpMu.Unlock()

return err
}

Expand All @@ -273,61 +149,12 @@ func (q *queue) resend() error {
q.cfg.log.Tracef("Resent %d", packet.Seq)
}

// We hold the awaitingCatchUpMu mutex for the duration of the resend to
// ensure that we don't process the delayed ACKs for the packets we are
// resending, during the resend. If that would happen, we would start
// the "proceedAfterTime" function timeout while still resending
// packets. That could mean that the NACK that the resent packets will
// trigger, might be received after the timeout has passed. That would
// cause the resend loop to trigger once more.
q.awaitingCatchUpMu.Unlock()

// Then await until we know that both parties are in sync.
q.awaitCatchUp()
// Then wait until we know that both parties are in sync.
q.syncer.waitForSync()

return nil
}

// awaitCatchUp awaits that we either receive the awaited ACK or NACK signal
// before returning. If we don't receive the awaited ACK or NACK signal before
// 3X the resend timeout, the function will also return.
// See the docs for the resend function for more details on why we need to await
// the awaited ACK or NACK signal.
func (q *queue) awaitCatchUp() {
q.cfg.log.Tracef("Awaiting catchup after resending the queue")

select {
case <-q.quit:
return
case <-q.awaitedACKSignal:
q.cfg.log.Tracef("Got awaitedACKSignal")
case <-q.awaitedNACKSignal:
q.cfg.log.Tracef("Got awaitedNACKSignal")
case <-time.After(q.cfg.timeout * awaitingTimeoutMultiplier):
q.cfg.log.Tracef("Timed out while awaiting catchup")

q.awaitingCatchUpMu.Lock()
q.awaitingCatchUp = false

// Drain both the ACK & NACK signal channels.
select {
case <-q.awaitedACKSignal:
default:
}

select {
case <-q.awaitedNACKSignal:
default:
}

q.awaitingCatchUpMu.Unlock()
}

// Send a caughtUpSignal to indicate that we have caught up after
// resending the queue.
q.caughtUpSignal <- struct{}{}
}

// processACK processes an incoming ACK of a given sequence number. The function
// returns true if the passed seq is an ACK for a packet we have sent but not
// yet received an ACK for.
Expand All @@ -340,20 +167,7 @@ func (q *queue) processACK(seq uint8) bool {
return false
}

// If we are awaiting a catch up, and the ACK is the awaited ACK, we
// start the proceedAfterTime function in a goroutine, which will send
// an awaitedACKSignal if we're still awaiting the resend catch up when
// the resend timeout has expired.
q.awaitingCatchUpMu.RLock()
if seq == q.awaitedACK && q.awaitingCatchUp {
q.cfg.log.Tracef("Got awaited ACK")

// We start the proceedAfterTime function in a goroutine, as we
// don't want to block the processing of other NACKs/ACKs while
// we're waiting for the resend timeout to expire.
go q.proceedAfterTime()
}
q.awaitingCatchUpMu.RUnlock()
q.syncer.processAck(seq)

q.baseMtx.Lock()
defer q.baseMtx.Unlock()
Expand Down Expand Up @@ -406,9 +220,6 @@ func (q *queue) processACK(seq uint8) bool {
// the NACK sequence number. This equivalent to receiving the ACKs for the
// packets before the NACK sequence number.
func (q *queue) processNACK(seq uint8) (bool, bool) {
q.awaitingCatchUpMu.Lock()
defer q.awaitingCatchUpMu.Unlock()

q.baseMtx.Lock()
defer q.baseMtx.Unlock()

Expand All @@ -417,27 +228,9 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {

q.cfg.log.Tracef("Received NACK %d", seq)

bumped := false

if q.awaitingCatchUp && seq == q.awaitedNACK {
q.cfg.log.Tracef("Sending awaitedNACKSignal")
q.awaitedNACKSignal <- struct{}{}

q.awaitingCatchUp = false

// In case the awaitedNACK is the same as sequenceTop, we can
// bump the base to be equal to sequenceTop, without triggering
// a new resend.
if seq == q.sequenceTop && q.sequenceBase != q.sequenceTop {
q.sequenceBase = q.sequenceTop

bumped = true
}
var bumped bool

// If we receive the awaited NACK, we shouldn't trigger a new
// resend, as we can now proceed to send new packets.
return false, bumped
}
q.syncer.processNack(seq)

// If the NACK is the same as sequenceTop, and we weren't awaiting this
// NACK as part of the resend catch up, it probably means that queue
Expand Down Expand Up @@ -474,39 +267,6 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
return true, bumped
}

// proceedAfterTime will wait for the resendTimeout and then send an
// awaitedACKSignal, if we're still awaiting the resend catch up.
func (q *queue) proceedAfterTime() {
// We await for the duration of the resendTimeout before sending the
// awaitedACKSignal, as that's the time we'd expect it to take for the
// other party to respond with a NACK, if the resent last packet in the
// queue would lead to a NACK. If we receive the awaitedNACKSignal
// before the timeout, we will receive the caughtUpSignal, and we can
// stop the execution early.
select {
case <-q.quit:
return
case <-q.caughtUpSignal:
q.cfg.log.Tracef("Already caught up.")

return
case <-time.After(q.cfg.timeout):
q.awaitingCatchUpMu.Lock()

if q.awaitingCatchUp {
q.cfg.log.Tracef("Sending awaitedACKSignal")
q.awaitedACKSignal <- struct{}{}

q.awaitingCatchUp = false
} else {
q.cfg.log.Tracef("Ending proceedAfterTime without any " +
"action")
}

q.awaitingCatchUpMu.Unlock()
}
}

// containsSequence is used to determine if a number, seq, is between two other
// numbers, base and top, where all the numbers lie in a finite field (modulo
// space) s.
Expand Down
12 changes: 2 additions & 10 deletions gbn/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gbn

import (
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -142,15 +141,8 @@ func resend(t *testing.T, q *queue, wg *sync.WaitGroup) {

// We also ensure that the above goroutine is has started the resend
// before this function returns.
err := wait.NoError(func() error {
q.awaitingCatchUpMu.Lock()
defer q.awaitingCatchUpMu.Unlock()

if !q.awaitingCatchUp {
return errors.New("Hasn't resent yet")
}

return nil
err := wait.Predicate(func() bool {
return q.syncer.getState() == syncStateResending
}, time.Second)
require.NoError(t, err)
}
Loading

0 comments on commit a1ee556

Please sign in to comment.