diff --git a/gbn/queue.go b/gbn/queue.go index 2ccee73..a0c4a9d 100644 --- a/gbn/queue.go +++ b/gbn/queue.go @@ -7,24 +7,6 @@ import ( "github.com/btcsuite/btclog" ) -const ( - // awaitingTimeoutMultiplier defines the multiplier we use when - // multiplying the resend timeout during a resend catch up, resulting in - // duration we wait for the resend catch up to complete before timing - // out. - // We set this to 3X the resend timeout. The reason we wait exactly 3X - // the resend timeout is that we expect that the max time correct - // behavior would take, would be: - // * 1X the resendTimeout for the time it would take for the party - // respond with an ACK for the last packet in the resend queue, i.e. the - // awaitedACK. - // * 1X the resendTimeout while waiting in proceedAfterTime before - // sending the awaitedACKSignal. - // * 1X extra resendTimeout as buffer, to ensure that we have enough - // time to process the ACKS/NACKS by other party + some extra margin. - awaitingTimeoutMultiplier = 3 -) - type queueCfg struct { // s is the maximum sequence number used to label packets. Packets // are labelled with incrementing sequence numbers modulo s. @@ -69,41 +51,7 @@ type queue struct { // topMtx is used to guard sequenceTop. topMtx sync.RWMutex - // awaitedACK defines the sequence number for the last packet in the - // resend queue. If we receive an ACK for this sequence number during - // the resend catch up, we wait for the duration of the resend timeout, - // and then proceed to send new packets, unless we receive the - // awaitedNACK during the wait time. If that happens, we will proceed - // send new packets as soon as we have processed the NACK. - awaitedACK uint8 - - // awaitedNACK defines the sequence number that in case we get a NACK - // with that sequence number during the resend catch up, we'd consider - // the catch up to be complete and we can proceed to send new packets. - awaitedNACK uint8 - - // awaitingCatchUp is set to true if we are awaiting a catch up after we - // have resent the queue. - awaitingCatchUp bool - - // awaitingCatchUpMu must be held when accessing or mutating the values - // of awaitedACK, awaitedNACK and awaitingCatchUp. - awaitingCatchUpMu sync.RWMutex - - // awaitedACKSignal is used to signal that we have received the awaited - // ACK after resending the queue, and have waited for the duration of - // the resend timeout. Once this signal is received, we can proceed to - // send new packets. - awaitedACKSignal chan struct{} - - // awaitedNACKSignal is used to signal that we have received the awaited - // NACK after resending the queue. Once this signal is received, we can - // proceed to send new packets. - awaitedNACKSignal chan struct{} - - // caughtUpSignal is used to signal that we have caught up after - // awaiting the catch up after resending the queue. - caughtUpSignal chan struct{} + syncer *syncer lastResend time.Time @@ -116,14 +64,15 @@ func newQueue(cfg *queueCfg) *queue { cfg.log = log } - return &queue{ - cfg: cfg, - content: make([]*PacketData, cfg.s), - awaitedACKSignal: make(chan struct{}, 1), - awaitedNACKSignal: make(chan struct{}, 1), - caughtUpSignal: make(chan struct{}, 1), - quit: make(chan struct{}), + q := &queue{ + cfg: cfg, + content: make([]*PacketData, cfg.s), + quit: make(chan struct{}), } + + q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit) + + return q } func (q *queue) stop() { @@ -155,63 +104,9 @@ func (q *queue) addPacket(packet *PacketData) { q.sequenceTop = (q.sequenceTop + 1) % q.cfg.s } -// resend resends the current contents of the queue, by invoking the callback -// for each packet that needs to be resent, and then awaits that we either -// receive the expected ACK or NACK after resending the queue, before returning. -// -// To understand why we need to await the awaited ACK/NACK after resending the -// queue, it ensures that we don't end up in a situation where we resend the -// queue over and over again due to latency and delayed NACKs by the other -// party. -// -// Consider the following scenario: -// 1. -// Alice sends packets 1, 2, 3 & 4 to Bob. -// 2. -// Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs. -// 3. -// Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for -// packets 3 & 4 are delayed and aren't received until Alice resend timeout -// has passed, which leads to Alice resending packets 3 & 4. Alice will after -// that receive the delayed ACKs for packets 3 & 4, but will consider that as -// the ACKs for the resent packets, and not the original packets which they were -// actually sent for. If we didn't wait after resending the queue, Alice would -// then proceed to send more packets (5 & 6). -// 4. -// When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due -// to latency, the packets 5 & 6 that Alice sent in step (3) above will then be -// received by Bob, and be processed as the correct response to the NACK 5. Bob -// will after that await packet 7. -// 5. -// Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is -// now awaiting packet 7, this send will lead to a NACK 7. But due to latency, -// if Alice doesn't wait resending the queue, Alice will proceed to send new -// packet(s) before receiving the NACK 7. -// 6. -// This resend loop would continue indefinitely, so we need to ensure that Alice -// waits after she has resent the queue, to ensure that she doesn't proceed to -// send new packets before she is sure that both parties are in sync. -// -// To ensure that we are in sync, after we have resent the queue, we will await -// that we either: -// 1. Receive a NACK for the sequence number succeeding the last packet in the -// resent queue i.e. in step (3) above, that would be NACK 5. -// OR -// 2. Receive an ACK for the last packet in the resent queue i.e. in step (3) -// above, that would be ACK 4. After we receive the expected ACK, we will then -// wait for the duration of the resend timeout before continuing. The reason why -// we wait for the resend timeout before continuing, is that the ACKs we are -// getting after a resend, could be delayed ACKs for the original packets we -// sent, and not ACKs for the resent packets. In step (3) above, the ACKs for -// packets 3 & 4 that Alice received were delayed ACKs for the original packets. -// If Alice would have immediately continued to send new packets (5 & 6) after -// receiving the ACK 4, she would have then received the NACK 5 from Bob which -// was the actual response to the resent queue. But as Alice had already -// continued to send packets 5 & 6 when receiving the NACK 5, the resend queue -// response to that NACK would cause the resend loop to continue indefinitely. -// -// When either of the 2 conditions above are met, we will consider both parties -// to be in sync, and we can proceed to send new packets. +// resend resends the current contents of the queue. It allows some time for the +// two parties to be seen as synced; this may fail in which case the caller is +// expected to call resend again. func (q *queue) resend() error { if time.Since(q.lastResend) < q.cfg.timeout { q.cfg.log.Tracef("Resent the queue recently.") @@ -223,10 +118,6 @@ func (q *queue) resend() error { return nil } - q.lastResend = time.Now() - - q.awaitingCatchUpMu.Lock() - q.baseMtx.RLock() base := q.sequenceBase q.baseMtx.RUnlock() @@ -236,26 +127,13 @@ func (q *queue) resend() error { q.topMtx.RUnlock() if base == top { - q.awaitingCatchUpMu.Unlock() + q.syncer.reset() return nil } // Prepare the queue for awaiting the resend catch up. - q.awaitedACK = (q.cfg.s + top - 1) % q.cfg.s - q.awaitedNACK = top - - q.cfg.log.Tracef("Set awaitedACK to %d & awaitedNACK to %d", - q.awaitedACK, q.awaitedNACK) - - q.awaitingCatchUp = true - - // Drain the caughtUpSignal channel, in case no proceedAfterTime - // func was executed after the last resend catch up. - select { - case <-q.caughtUpSignal: - default: - } + q.syncer.initResendUpTo(top) q.cfg.log.Tracef("Resending the queue") @@ -263,8 +141,6 @@ func (q *queue) resend() error { packet := q.content[base] if err := q.cfg.sendPkt(packet); err != nil { - q.awaitingCatchUpMu.Unlock() - return err } @@ -273,61 +149,12 @@ func (q *queue) resend() error { q.cfg.log.Tracef("Resent %d", packet.Seq) } - // We hold the awaitingCatchUpMu mutex for the duration of the resend to - // ensure that we don't process the delayed ACKs for the packets we are - // resending, during the resend. If that would happen, we would start - // the "proceedAfterTime" function timeout while still resending - // packets. That could mean that the NACK that the resent packets will - // trigger, might be received after the timeout has passed. That would - // cause the resend loop to trigger once more. - q.awaitingCatchUpMu.Unlock() - - // Then await until we know that both parties are in sync. - q.awaitCatchUp() + // Then wait until we know that both parties are in sync. + q.syncer.waitForSync() return nil } -// awaitCatchUp awaits that we either receive the awaited ACK or NACK signal -// before returning. If we don't receive the awaited ACK or NACK signal before -// 3X the resend timeout, the function will also return. -// See the docs for the resend function for more details on why we need to await -// the awaited ACK or NACK signal. -func (q *queue) awaitCatchUp() { - q.cfg.log.Tracef("Awaiting catchup after resending the queue") - - select { - case <-q.quit: - return - case <-q.awaitedACKSignal: - q.cfg.log.Tracef("Got awaitedACKSignal") - case <-q.awaitedNACKSignal: - q.cfg.log.Tracef("Got awaitedNACKSignal") - case <-time.After(q.cfg.timeout * awaitingTimeoutMultiplier): - q.cfg.log.Tracef("Timed out while awaiting catchup") - - q.awaitingCatchUpMu.Lock() - q.awaitingCatchUp = false - - // Drain both the ACK & NACK signal channels. - select { - case <-q.awaitedACKSignal: - default: - } - - select { - case <-q.awaitedNACKSignal: - default: - } - - q.awaitingCatchUpMu.Unlock() - } - - // Send a caughtUpSignal to indicate that we have caught up after - // resending the queue. - q.caughtUpSignal <- struct{}{} -} - // processACK processes an incoming ACK of a given sequence number. The function // returns true if the passed seq is an ACK for a packet we have sent but not // yet received an ACK for. @@ -340,20 +167,7 @@ func (q *queue) processACK(seq uint8) bool { return false } - // If we are awaiting a catch up, and the ACK is the awaited ACK, we - // start the proceedAfterTime function in a goroutine, which will send - // an awaitedACKSignal if we're still awaiting the resend catch up when - // the resend timeout has expired. - q.awaitingCatchUpMu.RLock() - if seq == q.awaitedACK && q.awaitingCatchUp { - q.cfg.log.Tracef("Got awaited ACK") - - // We start the proceedAfterTime function in a goroutine, as we - // don't want to block the processing of other NACKs/ACKs while - // we're waiting for the resend timeout to expire. - go q.proceedAfterTime() - } - q.awaitingCatchUpMu.RUnlock() + q.syncer.processAck(seq) q.baseMtx.Lock() defer q.baseMtx.Unlock() @@ -406,9 +220,6 @@ func (q *queue) processACK(seq uint8) bool { // the NACK sequence number. This equivalent to receiving the ACKs for the // packets before the NACK sequence number. func (q *queue) processNACK(seq uint8) (bool, bool) { - q.awaitingCatchUpMu.Lock() - defer q.awaitingCatchUpMu.Unlock() - q.baseMtx.Lock() defer q.baseMtx.Unlock() @@ -417,27 +228,9 @@ func (q *queue) processNACK(seq uint8) (bool, bool) { q.cfg.log.Tracef("Received NACK %d", seq) - bumped := false - - if q.awaitingCatchUp && seq == q.awaitedNACK { - q.cfg.log.Tracef("Sending awaitedNACKSignal") - q.awaitedNACKSignal <- struct{}{} - - q.awaitingCatchUp = false - - // In case the awaitedNACK is the same as sequenceTop, we can - // bump the base to be equal to sequenceTop, without triggering - // a new resend. - if seq == q.sequenceTop && q.sequenceBase != q.sequenceTop { - q.sequenceBase = q.sequenceTop - - bumped = true - } + var bumped bool - // If we receive the awaited NACK, we shouldn't trigger a new - // resend, as we can now proceed to send new packets. - return false, bumped - } + q.syncer.processNack(seq) // If the NACK is the same as sequenceTop, and we weren't awaiting this // NACK as part of the resend catch up, it probably means that queue @@ -474,39 +267,6 @@ func (q *queue) processNACK(seq uint8) (bool, bool) { return true, bumped } -// proceedAfterTime will wait for the resendTimeout and then send an -// awaitedACKSignal, if we're still awaiting the resend catch up. -func (q *queue) proceedAfterTime() { - // We await for the duration of the resendTimeout before sending the - // awaitedACKSignal, as that's the time we'd expect it to take for the - // other party to respond with a NACK, if the resent last packet in the - // queue would lead to a NACK. If we receive the awaitedNACKSignal - // before the timeout, we will receive the caughtUpSignal, and we can - // stop the execution early. - select { - case <-q.quit: - return - case <-q.caughtUpSignal: - q.cfg.log.Tracef("Already caught up.") - - return - case <-time.After(q.cfg.timeout): - q.awaitingCatchUpMu.Lock() - - if q.awaitingCatchUp { - q.cfg.log.Tracef("Sending awaitedACKSignal") - q.awaitedACKSignal <- struct{}{} - - q.awaitingCatchUp = false - } else { - q.cfg.log.Tracef("Ending proceedAfterTime without any " + - "action") - } - - q.awaitingCatchUpMu.Unlock() - } -} - // containsSequence is used to determine if a number, seq, is between two other // numbers, base and top, where all the numbers lie in a finite field (modulo // space) s. diff --git a/gbn/queue_test.go b/gbn/queue_test.go index dc47748..232b483 100644 --- a/gbn/queue_test.go +++ b/gbn/queue_test.go @@ -1,7 +1,6 @@ package gbn import ( - "errors" "sync" "testing" "time" @@ -142,15 +141,8 @@ func resend(t *testing.T, q *queue, wg *sync.WaitGroup) { // We also ensure that the above goroutine is has started the resend // before this function returns. - err := wait.NoError(func() error { - q.awaitingCatchUpMu.Lock() - defer q.awaitingCatchUpMu.Unlock() - - if !q.awaitingCatchUp { - return errors.New("Hasn't resent yet") - } - - return nil + err := wait.Predicate(func() bool { + return q.syncer.getState() == syncStateResending }, time.Second) require.NoError(t, err) } diff --git a/gbn/syncer.go b/gbn/syncer.go new file mode 100644 index 0000000..a37f3fa --- /dev/null +++ b/gbn/syncer.go @@ -0,0 +1,291 @@ +package gbn + +import ( + "sync" + "time" + + "github.com/btcsuite/btclog" +) + +const ( + // awaitingTimeoutMultiplier defines the multiplier we use when + // multiplying the resend timeout, resulting in duration we wait to be + // sync to complete before timing out. + // We set this to 3X the resend timeout. The reason we wait exactly 3X + // the resend timeout is that we expect that the max time correct + // behavior would take, would be: + // * 1X the resendTimeout for the time it would take for the party + // respond with an ACK for the last packet in the resend queue, i.e. the + // awaitedACK. + // * 1X the resendTimeout while waiting in proceedAfterTime before + // completing the sync. + // * 1X extra resendTimeout as buffer, to ensure that we have enough + // time to process the ACKS/NACKS by other party + some extra margin. + awaitingTimeoutMultiplier = 3 +) + +type syncState uint8 + +const ( + syncStateIdle syncState = iota + syncStateResending +) + +// syncer is used to ensure that both the sender and the receiver are in sync +// before the waitForSync function is completed. This is done by awaiting that +// we receive either the expected ACK or NACK after resending the queue. +// +// To understand why we need to await the awaited ACK/NACK after resending the +// queue, it ensures that we don't end up in a situation where we resend the +// queue over and over again due to latency and delayed NACKs by the other +// party. +// +// Consider the following scenario: +// 1. +// Alice sends packets 1, 2, 3 & 4 to Bob. +// 2. +// Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs. +// 3. +// Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for +// packets 3 & 4 are delayed and aren't received until Alice resend timeout +// has passed, which leads to Alice resending packets 3 & 4. Alice will after +// that receive the delayed ACKs for packets 3 & 4, but will consider that as +// the ACKs for the resent packets, and not the original packets which they were +// actually sent for. If we didn't wait after resending the queue, Alice would +// then proceed to send more packets (5 & 6). +// 4. +// When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due +// to latency, the packets 5 & 6 that Alice sent in step (3) above will then be +// received by Bob, and be processed as the correct response to the NACK 5. Bob +// will after that await packet 7. +// 5. +// Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is +// now awaiting packet 7, this send will lead to a NACK 7. But due to latency, +// if Alice doesn't wait resending the queue, Alice will proceed to send new +// packet(s) before receiving the NACK 7. +// 6. +// This resend loop would continue indefinitely, so we need to ensure that Alice +// waits after she has resent the queue, to ensure that she doesn't proceed to +// send new packets before she is sure that both parties are in sync. +// +// To ensure that we are in sync, after we have resent the queue, we will await +// that we either: +// 1. Receive a NACK for the sequence number succeeding the last packet in the +// resent queue i.e. in step (3) above, that would be NACK 5. +// OR +// 2. Receive an ACK for the last packet in the resent queue i.e. in step (3) +// above, that would be ACK 4. After we receive the expected ACK, we will then +// wait for the duration of the resend timeout before continuing. The reason why +// we wait for the resend timeout before continuing, is that the ACKs we are +// getting after a resend, could be delayed ACKs for the original packets we +// sent, and not ACKs for the resent packets. In step (3) above, the ACKs for +// packets 3 & 4 that Alice received were delayed ACKs for the original packets. +// If Alice would have immediately continued to send new packets (5 & 6) after +// receiving the ACK 4, she would have then received the NACK 5 from Bob which +// was the actual response to the resent queue. But as Alice had already +// continued to send packets 5 & 6 when receiving the NACK 5, the resend queue +// response to that NACK would cause the resend loop to continue indefinitely. +// +// When either of the 2 conditions above are met, we will consider both parties +// to be in sync. +type syncer struct { + s uint8 + log btclog.Logger + timeout time.Duration + + state syncState + + // awaitedACK defines the sequence number for the last packet in the + // resend queue. If we receive an ACK for this sequence number during + // waiting to sync, we wait for the duration of the resend timeout, + // and then proceed to send new packets, unless we receive the + // awaitedNACK during the wait time. If that happens, we will proceed + // send new packets as soon as we have processed the NACK. + awaitedACK uint8 + + // awaitedNACK defines the sequence number that in case we get a NACK + // with that sequence number when waiting to sync, we'd consider + // the sync to be completed and we can proceed to send new packets. + awaitedNACK uint8 + + // cancel is used to mark that the sync has been completed. + cancel chan struct{} + + quit chan struct{} + mu sync.Mutex +} + +// newSyncer creates a new syncer instance. +func newSyncer(s uint8, log btclog.Logger, timeout time.Duration, + quit chan struct{}) *syncer { + + return &syncer{ + s: s, + log: log, + timeout: timeout, + state: syncStateIdle, + cancel: make(chan struct{}), + quit: quit, + } +} + +// reset resets the syncer state to idle and marks the sync as completed. +func (c *syncer) reset() { + c.mu.Lock() + defer c.mu.Unlock() + + c.resetUnsafe() +} + +// resetUnsafe resets the syncer state to idle and marks the sync as completed. +// +// NOTE: when calling this function, the caller must hold the syncer mutex. +func (c *syncer) resetUnsafe() { + c.state = syncStateIdle + + // Cancel any pending sync. + select { + case c.cancel <- struct{}{}: + default: + } +} + +// initResendUpTo initializes the syncer to the resending state, and will after +// this call be ready to await the sync to be completed when calling the +// waitForSync function. +// The top argument defines the sequence number of the next packet to be sent +// after resending the queue. +func (c *syncer) initResendUpTo(top uint8) { + c.mu.Lock() + defer c.mu.Unlock() + + c.state = syncStateResending + + // Drain the cancel channel, to reinitialize it for the new sync. + select { + case <-c.cancel: + default: + } + + c.awaitedACK = (c.s + top - 1) % c.s + c.awaitedNACK = top + + c.log.Tracef("Set awaitedACK to %d & awaitedNACK to %d", + c.awaitedACK, c.awaitedNACK) +} + +// getState returns the current state of the syncer. +func (c *syncer) getState() syncState { + c.mu.Lock() + defer c.mu.Unlock() + + return c.state +} + +// waitForSync waits for the sync to be completed. The sync is completed when we +// receive either the awaitedNACK, the awaitedACK + resend timeout has passed, +// or when timing out. +func (c *syncer) waitForSync() { + c.log.Tracef("Awaiting sync after resending the queue") + + select { + case <-c.quit: + return + + case <-c.cancel: + c.log.Tracef("sync canceled or reset") + + case <-time.After(c.timeout * awaitingTimeoutMultiplier): + c.log.Tracef("Timed out while waiting for sync") + } + + c.reset() +} + +// processAck mark the sync as completed if the passed sequence number matches +// the awaitedACK, after the resend timeout has passed. +// If we are not resending or waiting after a resend, this is a no-op. +func (c *syncer) processAck(seq uint8) { + c.mu.Lock() + defer c.mu.Unlock() + + // If we are not resending or waiting after a resend, just swallow the + // ACK. + if c.state != syncStateResending { + return + } + + // Else, if we are waiting but this is not the ack we are waiting for, + // just swallow it. + if seq != c.awaitedACK { + return + } + + c.log.Tracef("Got awaited ACK") + + // We start the proceedAfterTime function in a goroutine, as we + // don't want to block the processing of other NACKs/ACKs while + // we're waiting for the resend timeout to expire. + go c.proceedAfterTime() +} + +// processNack mark the sync as completed if the passed sequence number matches +// the awaitedNACK. +// If we are not resending or waiting after a resend, this is a no-op. +func (c *syncer) processNack(seq uint8) { + c.mu.Lock() + defer c.mu.Unlock() + + // If we are not resending or waiting after a resend, just swallow the + // NACK. + if c.state != syncStateResending { + return + } + + // Else, if we are waiting but this is not the NACK we are waiting for, + // just swallow it. + if seq != c.awaitedNACK { + return + } + + c.log.Tracef("Got awaited NACK") + + c.resetUnsafe() +} + +// proceedAfterTime will wait for the resendTimeout and then complete the sync, +// if we haven't completed the sync yet by receiving the awaitedNACK. +func (c *syncer) proceedAfterTime() { + // We await for the duration of the resendTimeout before completing the + // sync, as that's the time we'd expect it to take for the other party + // to respond with a NACK, if the resent last packet in the + // queue would lead to a NACK. If we receive the awaitedNACK + // before the timeout, the cancel channel will be sent over, and we can + // stop the execution early. + select { + case <-c.quit: + return + + case <-c.cancel: + c.log.Tracef("sync succeeded or was reset") + + // As we can't be sure that waitForSync cancel listener was + // triggered before this one, we send over the cancel channel + // again, to make sure that both listeners are triggered. + c.reset() + + return + + case <-time.After(c.timeout): + c.mu.Lock() + defer c.mu.Unlock() + + if c.state != syncStateResending { + return + } + + c.log.Tracef("Completing sync after awaitedACK timeout") + + c.resetUnsafe() + } +}