From 6000041d79730c5b7bccca9a6cbf804dfa9527be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 21 Nov 2023 01:17:55 +0100 Subject: [PATCH 01/12] gbn: add timeout manager The timeout manager is a manager for all timeout values used by the different components of the gbn package. The timeout manager enables the functionality of dynamically changing the resend timeout value based on how long it takes to receive an response from the counterparty. To make the gbn package more modular, we'll move the responsibility of managing the timeout values of the gbn package to the timeout manager in the next commit. --- gbn/config.go | 64 +++++++ gbn/gbn_conn.go | 9 +- gbn/timeout_manager.go | 356 ++++++++++++++++++++++++++++++++++++ gbn/timeout_manager_test.go | 262 ++++++++++++++++++++++++++ 4 files changed, 684 insertions(+), 7 deletions(-) create mode 100644 gbn/timeout_manager.go create mode 100644 gbn/timeout_manager_test.go diff --git a/gbn/config.go b/gbn/config.go index 6a027b4..064c80d 100644 --- a/gbn/config.go +++ b/gbn/config.go @@ -2,6 +2,70 @@ package gbn import "time" +// TimeoutOptions can be used to modify the default timeout values used within +// the TimeoutManager. +type TimeoutOptions func(manager *TimeoutManager) + +// WithStaticResendTimeout is used to set a static resend timeout. This is the +// time to wait for ACKs before resending the queue. +func WithStaticResendTimeout(timeout time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.useStaticTimeout = true + manager.resendTimeout = timeout + } +} + +// WithResendMultiplier is used to set the resend multiplier. This is the +// multiplier we use when dynamically setting the resend timeout, based on how +// long it took for other party to respond. +// Note that when setting the resend timeout manually with the +// WithStaticResendTimeout option, this option will have no effect. +// Note that the passed multiplier must be greater than zero or this option will +// have no effect. +func WithResendMultiplier(multiplier int) TimeoutOptions { + return func(manager *TimeoutManager) { + if multiplier > 0 { + manager.resendMultiplier = multiplier + } + } +} + +// WithTimeoutUpdateFrequency is used to set the frequency of how many +// corresponding responses we need to receive until updating the resend timeout. +// Note that when setting the resend timeout manually with the WithTimeout +// option, this option will have no effect. +// Also note that the passed frequency must be greater than zero or this option +// will have no effect. +func WithTimeoutUpdateFrequency(frequency int) TimeoutOptions { + return func(manager *TimeoutManager) { + if frequency > 0 { + manager.timeoutUpdateFrequency = frequency + } + } +} + +// WithTMHandshakeTimeout is used to set the timeout used during the handshake. +// If the timeout is reached without response from the peer then the handshake +// will be aborted and restarted. +func WithTMHandshakeTimeout(timeout time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.handshakeTimeout = timeout + } +} + +// WithTMKeepalivePing is used to send a ping packet if no packets have been +// received from the other side for the given duration. This helps keep the +// connection alive and also ensures that the connection is closed if the +// other side does not respond to the ping in a timely manner. After the ping +// the connection will be closed if the other side does not respond within +// time duration. +func WithTMKeepalivePing(ping, pong time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.pingTime = ping + manager.pongTime = pong + } +} + // config holds the configuration values for an instance of GoBackNConn. type config struct { // n is the window size. The sender can send a maximum of n packets diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index 50a0cc9..21454d7 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -21,12 +21,7 @@ var ( ) const ( - DefaultN = 20 - defaultHandshakeTimeout = 100 * time.Millisecond - defaultResendTimeout = 100 * time.Millisecond - finSendTimeout = 1000 * time.Millisecond - DefaultSendTimeout = math.MaxInt64 - DefaultRecvTimeout = math.MaxInt64 + DefaultN = 20 ) type sendBytesFunc func(ctx context.Context, b []byte) error @@ -322,7 +317,7 @@ func (g *GoBackNConn) Close() error { g.log.Tracef("Try sending FIN") ctxc, cancel := context.WithTimeout( - g.ctx, finSendTimeout, + g.ctx, defaultFinSendTimeout, ) defer cancel() if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil { diff --git a/gbn/timeout_manager.go b/gbn/timeout_manager.go new file mode 100644 index 0000000..e74b7b6 --- /dev/null +++ b/gbn/timeout_manager.go @@ -0,0 +1,356 @@ +package gbn + +import ( + "math" + "sync" + "time" + + "github.com/btcsuite/btclog" +) + +const ( + defaultHandshakeTimeout = 100 * time.Millisecond + defaultResendTimeout = 100 * time.Millisecond + minimumResendTimeout = 100 * time.Millisecond + defaultFinSendTimeout = 1000 * time.Millisecond + defaultResendMultiplier = 5 + defaultTimeoutUpdateFrequency = 100 + DefaultSendTimeout = math.MaxInt64 + DefaultRecvTimeout = math.MaxInt64 +) + +// TimeoutManager manages the different timeouts used by the gbn package. +type TimeoutManager struct { + // useStaticTimeout is used to indicate whether the resendTimeout + // has been manually set, and if so, should not be updated dynamically. + useStaticTimeout bool + + // hasSetDynamicTimeout is used to indicate whether the resendTimeout + // has ever been set dynamically. + hasSetDynamicTimeout bool + + // resendTimeout defines the current resend timeout used by the + // timeout manager. + // The resend timeout is the duration that will be waited before + // resending the packets in the current queue. The timeout is set + // dynamically, and is set to the time it took for the other party to + // respond, multiplied by the resendMultiplier. + resendTimeout time.Duration + + // resendMultiplier defines the multiplier used when multiplying the + // duration it took for the other party to respond when setting the + // resendTimeout. + resendMultiplier int + + // latestSentSYNTime is used to keep track of the time when the latest + // SYN message was sent. This is used to dynamically set the resend + // timeout, based on how long it took for the other party to respond to + // the SYN message. + latestSentSYNTime time.Time + + // latestSentSYNTimeMu should be locked when updating or accessing the + // latestSentSYNTime field. + latestSentSYNTimeMu sync.Mutex + + // handshakeTimeout is the time after which the server or client + // will abort and restart the handshake if the expected response is + // not received from the peer. + handshakeTimeout time.Duration + + // finSendTimeout is the timeout after which the created context for + // sending a FIN message will be time out. + finSendTimeout time.Duration + + // sendTimeout defines the max time we will wait to send a msg before + // timing out. + sendTimeout time.Duration + + // recvTimeout defines the max time we will wait to receive a msg before + // timing out. + recvTimeout time.Duration + + // pingTime represents at which frequency we will send pings to the + // counterparty if we've received no packet. + pingTime time.Duration + + // pongTime represents how long we will wait for the expect a pong + // response after we've sent a ping. If no response is received within + // the time limit, we will close the connection. + pongTime time.Duration + + // responseCounter represents the current number of corresponding + // responses received since last updating the resend timeout. + responseCounter int + + // timeoutUpdateFrequency represents the frequency of how many + // corresponding responses we need to receive until the resend timeout + // will be updated. + timeoutUpdateFrequency int + + log btclog.Logger + + sentTimes map[uint8]time.Time + sentTimesMu sync.Mutex + + // mu should be locked when updating or accessing any of timeout + // manager's timeout fields. It should also be held when accessing any + // of the timeout manager's fields that get updated throughout the + // lifecycle of the timeout manager after initialization, that doesn't + // have a dedicated mutex. + // + // Note that the lock order for this mutex is before any of the other + // mutexes in the timeout manager. + mu sync.RWMutex +} + +// NewTimeOutManager creates a new timeout manager. +func NewTimeOutManager(logger btclog.Logger, + timeoutOpts ...TimeoutOptions) *TimeoutManager { + + if logger == nil { + logger = log + } + + m := &TimeoutManager{ + log: logger, + resendTimeout: defaultResendTimeout, + handshakeTimeout: defaultHandshakeTimeout, + useStaticTimeout: false, + resendMultiplier: defaultResendMultiplier, + finSendTimeout: defaultFinSendTimeout, + recvTimeout: DefaultRecvTimeout, + sendTimeout: DefaultSendTimeout, + sentTimes: make(map[uint8]time.Time), + timeoutUpdateFrequency: defaultTimeoutUpdateFrequency, + } + + for _, opt := range timeoutOpts { + opt(m) + } + + return m +} + +// Sent should be called when a message is sent by the connection. The resent +// parameter should be set to true if the message is a resent message. +func (m *TimeoutManager) Sent(msg Message, resent bool) { + if m.useStaticTimeout { + return + } + + sentAt := time.Now() + + // We will dynamically update the resend timeout throughout the lifetime + // of the connection, to ensure that it reflects the current response + // time. Therefore, we'll keep track of when we sent a package, and when + // we receive the corresponding response. + // If we're resending a message, we can't know if a corresponding + // response is the response to the resent message, or the original + // message. Therefore, we never update the resend timeout after + // resending a message. + switch msg := msg.(type) { + case *PacketSYN: + m.latestSentSYNTimeMu.Lock() + defer m.latestSentSYNTimeMu.Unlock() + + if !resent { + m.latestSentSYNTime = sentAt + + return + } + + // If we've resent the SYN, we'll reset the latestSentSYNTime to + // the zero value, to ensure that we don't update the resend + // timeout based on the corresponding response, as we can't know + // if the response is for the resent SYN or the original SYN. + m.latestSentSYNTime = time.Time{} + + case *PacketData: + m.sentTimesMu.Lock() + defer m.sentTimesMu.Unlock() + + if resent { + // If we're resending a data packet, we'll delete the + // sent time for the sequence, to ensure that we won't + // update the resend timeout when we receive the + // corresponding response. + delete(m.sentTimes, msg.Seq) + + return + } + + m.sentTimes[msg.Seq] = sentAt + } +} + +// Received should be called when a message is received by the connection. +func (m *TimeoutManager) Received(msg Message) { + if m.useStaticTimeout { + return + } + + receivedAt := time.Now() + + // We lock the TimeoutManager's mu as soon as Received is executed, to + // ensure that any GetResendTimeout call we receive concurrently after + // this Received call, will return an updated resend timeout if this + // Received call does update the timeout. + m.mu.Lock() + defer m.mu.Unlock() + + switch msg := msg.(type) { + case *PacketSYN, *PacketSYNACK: + m.latestSentSYNTimeMu.Lock() + + if m.latestSentSYNTime.IsZero() { + m.latestSentSYNTimeMu.Unlock() + + return + } + + responseTime := receivedAt.Sub(m.latestSentSYNTime) + + m.latestSentSYNTime = time.Time{} + + m.latestSentSYNTimeMu.Unlock() + + m.updateResendTimeoutUnsafe(responseTime) + + case *PacketACK: + m.sentTimesMu.Lock() + + sentTime, ok := m.sentTimes[msg.Seq] + if !ok { + m.sentTimesMu.Unlock() + + return + } + + delete(m.sentTimes, msg.Seq) + + m.sentTimesMu.Unlock() + + m.responseCounter++ + + reachedFrequency := m.responseCounter% + m.timeoutUpdateFrequency == 0 + + // In case we never set the resend timeout dynamically in the + // handshake due to needing to resend the SYN, or if we've + // reached received the number of packages matching the + // timeoutUpdateFrequency, we'll update the resend timeout. + if !m.hasSetDynamicTimeout || reachedFrequency { + m.responseCounter = 0 + + m.updateResendTimeoutUnsafe(receivedAt.Sub(sentTime)) + } + } +} + +// updateResendTimeout updates the resend timeout based on the given response +// time. The resend timeout will be only be updated if the given response time +// is greater than the default resend timeout, after being multiplied by the +// resendMultiplier. +// +// NOTE: This function TimeoutManager mu must be held when calling this +// function. +func (m *TimeoutManager) updateResendTimeoutUnsafe(responseTime time.Duration) { + m.hasSetDynamicTimeout = true + + multipliedTimeout := time.Duration(m.resendMultiplier) * responseTime + + if multipliedTimeout < minimumResendTimeout { + m.log.Tracef("Setting resendTimeout to minimumResendTimeout "+ + "%v as the new dynamic timeout %v is not greater than "+ + "the minimum resendTimeout.", + m.resendTimeout, multipliedTimeout) + multipliedTimeout = minimumResendTimeout + } + + m.log.Tracef("Updating resendTimeout to %v", multipliedTimeout) + + m.resendTimeout = multipliedTimeout +} + +// GetResendTimeout returns the current resend timeout. +func (m *TimeoutManager) GetResendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.resendTimeout +} + +// GetHandshakeTimeout returns the handshake timeout. +func (m *TimeoutManager) GetHandshakeTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.handshakeTimeout +} + +// GetFinSendTimeout returns the fin send timeout. +func (m *TimeoutManager) GetFinSendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.finSendTimeout +} + +// GetSendTimeout returns the send timeout. +func (m *TimeoutManager) GetSendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.sendTimeout +} + +// GetRecvTimeout returns the recv timeout. +func (m *TimeoutManager) GetRecvTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.recvTimeout +} + +// GetPingTime returns the ping time, representing at which frequency we will +// send pings to the counterparty if we've received no packet. +func (m *TimeoutManager) GetPingTime() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.pingTime == 0 { + return time.Duration(math.MaxInt64) + } + + return m.pingTime +} + +// GetPongTime returns the pong timeout, representing how long we will wait for +// the expect a pong response after we've sent a ping. If no response is +// received within the time limit, we will close the connection. +func (m *TimeoutManager) GetPongTime() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.pongTime == 0 { + return time.Duration(math.MaxInt64) + } + + return m.pongTime +} + +// SetSendTimeout sets the send timeout. +func (m *TimeoutManager) SetSendTimeout(timeout time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.sendTimeout = timeout +} + +// SetRecvTimeout sets the receive timeout. +func (m *TimeoutManager) SetRecvTimeout(timeout time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.recvTimeout = timeout +} diff --git a/gbn/timeout_manager_test.go b/gbn/timeout_manager_test.go new file mode 100644 index 0000000..69876af --- /dev/null +++ b/gbn/timeout_manager_test.go @@ -0,0 +1,262 @@ +package gbn + +import ( + "sync" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/stretchr/testify/require" +) + +// BenchmarkTimeoutMgrSynchronously benchmarks the timeout manager when sending +// and receiving messages synchronously. +func BenchmarkTimeoutMgrSynchronously(b *testing.B) { + // Create a new timeout manager to use for the test. We set the timeout + // update frequency 2, so that the resend timeout is dynamically set + // every other message. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + for n := 0; n < b.N; n++ { + msg := &PacketData{Seq: uint8(n)} + + tm.Sent(msg, false) + tm.Received(msg) + } +} + +// BenchmarkTimeoutMgrConcurrently benchmarks the timeout manager when sending +// and receiving messages concurrently. +func BenchmarkTimeoutMgrConcurrently(b *testing.B) { + // Create a new timeout manager to use for the test. We set the timeout + // update frequency 2, so that the resend timeout is dynamically set + // every other message. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + var wg sync.WaitGroup + for n := 0; n < b.N; n++ { + wg.Add(1) + go func(seq uint8) { + defer wg.Done() + + msg := &PacketData{Seq: seq} + + tm.Sent(msg, false) + tm.Received(msg) + }(uint8(n)) + } + + wg.Wait() +} + +// TestStressTestTimeoutMgr tests that the timeout manager can handle a large +// number of concurrent Sent & Received calls, to ensure that the functions does +// not cause any deadlocks. +func TestStressTestTimeoutMgr(t *testing.T) { + t.Parallel() + + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + var wg sync.WaitGroup + for n := 0; n < 100000; n++ { + wg.Add(1) + go func(seq uint8) { + defer wg.Done() + + msg := &PacketData{Seq: seq} + + tm.Sent(msg, false) + tm.Received(msg) + }(uint8(n)) + } + + wg.Wait() +} + +// TestDynamicTimeout ensures that the resend timeout is dynamically set as +// expected in the timeout manager, with the SYN message that's sent with the +// handshake. +func TestSYNDynamicTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager to use for the test. + tm := NewTimeOutManager(nil) + + // First, we'll ensure that the resend timeout doesn't change if we + // don't send and receive messages. + noResendTimeoutChange(t, tm, time.Second) + + // Next, we'll simulate that a SYN message has been sent and received. + // This should change the resend timeout given that the new timeout is + // greater than the minimum allowed timeout. + initialResendTimeout := tm.GetResendTimeout() + + synMsg := &PacketSYN{N: 20} + + sendAndReceive(t, tm, synMsg, synMsg, false) + + // The resend timeout should now have dynamically changed. Since the + // sendAndReceive function waits for one second before simulating the + // response, execution of the function must have more than 1 sec. + // We are then sure that the resend timeout has been dynamically + // set to a value greater default 1 second resend timeout. + resendTimeout := tm.GetResendTimeout() + require.Greater(t, resendTimeout, initialResendTimeout) + + // Let's also test that the resend timeout is dynamically set to the + // expected value, and that the resend multiplier works as expected. If + // we set the resend multiplier to 10, then send and receive a response + // after 1 second, then the resend timeout should be around 10 seconds. + tm.resendMultiplier = 10 + + sendAndReceive(t, tm, synMsg, synMsg, false) + + // As it takes a short amount of time to simulate the send and receive + // of the message, we'll accept a set resend timeout within a range of + // 10-11 seconds as correct. + resendTimeout = tm.GetResendTimeout() + require.InDelta(t, time.Second*10, resendTimeout, float64(time.Second)) + + // We'll also test that the resend timeout isn't dynamically set if + // the new timeout is less than the minimum allowed resend timeout. + tm.resendMultiplier = 1 + + sendAndReceiveWithDuration( + t, tm, minimumResendTimeout/10, synMsg, synMsg, false, + ) + + newTimeout := tm.GetResendTimeout() + require.Equal(t, minimumResendTimeout, newTimeout) + + // Then we'll test that the resend timeout isn't dynamically set if + // when simulating a that the SYN message has been resent. + sendAndReceive(t, tm, synMsg, synMsg, true) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, newTimeout, unchangedResendTimeout) +} + +// TestDataPackageDynamicTimeout ensures that the resend timeout is dynamically +// set as expected in the timeout manager, when PacketData messages and their +// corresponding response are exchanged between the counterparties. +func TestDataPackageDynamicTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager to use for the test. We set the timeout + // update frequency to a high value so that we're sure that it's not the + // reason for the first the resend timeout change. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(1000)) + + // Next, we'll simulate that a data packet has been sent and received. + // This should change the resend timeout despite the timeout update + // frequency being set to a high value, as we never set the resend + // timeout with in the handshake with by a SYN msg + response. + initialResendTimeout := tm.GetResendTimeout() + + msg := &PacketData{Seq: 20} + response := &PacketACK{Seq: 20} + + sendAndReceive(t, tm, msg, response, false) + + // The resend timeout should now have dynamically changed. + resendTimeout := tm.GetResendTimeout() + require.NotEqual(t, initialResendTimeout, resendTimeout) + + // Now let's test that the timeout update frequency works as expected. + // If we set it to 2, we should only update the resend timeout on the + // second data packet send + receive (as the receive counter in the + // timeout manager was just reset above when setting the resend + // timeout). + tm.timeoutUpdateFrequency = 2 + + // We set resend multiplier to a high value, to ensure that the resend + // timeout is guaranteed to be set to a greater value then the previous + // resend timeout. + tm.resendMultiplier = 10 + + // The first send and receive should not change the resend timeout. + sendAndReceive(t, tm, msg, response, false) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, resendTimeout, unchangedResendTimeout) + + // The second send and receive should however change the resend timeout. + sendAndReceive(t, tm, msg, response, false) + + newResendTimeout := tm.GetResendTimeout() + require.NotEqual(t, resendTimeout, newResendTimeout) + + // Finally let's test that the resend timeout isn't dynamically set when + // simulating that the data packet has been resent. + tm.timeoutUpdateFrequency = 1 + tm.resendMultiplier = 100 + + sendAndReceive(t, tm, msg, response, true) + + unchangedResendTimeout = tm.GetResendTimeout() + require.Equal(t, newResendTimeout, unchangedResendTimeout) +} + +// TestStaticTimeout ensures that the resend timeout isn't dynamically set if a +// static timeout has been set. +func TestStaticTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager with a set static resend timeout to use + // for the test. + staticTimeout := time.Second * 2 + tm := NewTimeOutManager(nil, WithStaticResendTimeout(staticTimeout)) + + synMsg := &PacketSYN{N: 20} + + // Then ensure that the resend timeout isn't dynamically set if we send + // and receive messages after setting a static timeout. + sendAndReceive(t, tm, synMsg, synMsg, false) + + resendTimeout := tm.GetResendTimeout() + require.Equal(t, staticTimeout, resendTimeout) +} + +// sendAndReceive simulates that a SYN message has been sent for the passed the +// timeout manager, and then waits for one second before a simulating the SYN +// response. While waiting, the function asserts that the resend timeout hasn't +// changed. +func sendAndReceive(t *testing.T, tm *TimeoutManager, msg Message, + response Message, resent bool) { + + t.Helper() + + sendAndReceiveWithDuration(t, tm, time.Second, msg, response, resent) +} + +// sendAndReceive simulates that a SYN message has been sent for the passed the +// timeout manager, and then waits for specified delay before a simulating the +// SYN response. While waiting, the function asserts that the resend timeout +// hasn't changed. +func sendAndReceiveWithDuration(t *testing.T, tm *TimeoutManager, + responseDelay time.Duration, msg Message, response Message, + resent bool) { + + t.Helper() + + tm.Sent(msg, resent) + + noResendTimeoutChange(t, tm, responseDelay) + + tm.Received(response) +} + +// noResendTimeoutChange asserts that the resend timeout hasn't changed for the +// passed timeout manager for the specified duration. +func noResendTimeoutChange(t *testing.T, tm *TimeoutManager, + duration time.Duration) { + + t.Helper() + + resendTimeout := tm.GetResendTimeout() + + err := wait.Invariant(func() bool { + return resendTimeout == tm.GetResendTimeout() + }, duration) + require.NoError(t, err) +} From 50db412df23ec878ad7143f95a965711d611b003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Sat, 16 Dec 2023 15:50:39 +0100 Subject: [PATCH 02/12] gbn: use timeout manager to handle timeouts This commit moves the responsibility of managing the timeout values throughout the gbn package to the `TimeoutManager` struct. Even though the `TimeoutManager` is now used to manage timeouts, the resend timeout is not yet set dynamically for the connections of the gbn package. Support and usage of that functionality will added in the upcoming commits. --- gbn/gbn_client.go | 4 +- gbn/gbn_conn.go | 111 ++++++++++++++++++++------------------------- gbn/gbn_server.go | 2 +- gbn/queue.go | 17 +++---- gbn/queue_test.go | 10 ++-- gbn/syncer.go | 28 ++++++------ gbn/syncer_test.go | 5 +- 7 files changed, 88 insertions(+), 89 deletions(-) diff --git a/gbn/gbn_client.go b/gbn/gbn_client.go index c1032e2..cab73a1 100644 --- a/gbn/gbn_client.go +++ b/gbn/gbn_client.go @@ -128,9 +128,11 @@ handshake: default: } + timeout := g.timeoutManager.GetHandshakeTimeout() + var b []byte select { - case <-time.After(g.cfg.handshakeTimeout): + case <-time.After(timeout): g.log.Debugf("SYN resendTimeout. Resending " + "SYN.") diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index 21454d7..18b5183 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "sync" "time" @@ -41,10 +40,6 @@ type GoBackNConn struct { recvDataChan chan *PacketData sendDataChan chan *PacketData - sendTimeout time.Duration - recvTimeout time.Duration - timeoutsMu sync.RWMutex - log btclog.Logger // receivedACKSignal channel is used to signal that the queue size has @@ -65,6 +60,10 @@ type GoBackNConn struct { // remoteClosed is closed if the remote party initiated the FIN sequence. remoteClosed chan struct{} + // timeoutManager is used to manage all the timeouts used by the + // GoBackNConn. + timeoutManager *TimeoutManager + // quit is used to stop the normal operations of the connection. // Once closed, the send and receive streams will still be available // for the FIN sequence. @@ -84,12 +83,12 @@ func newGoBackNConn(ctx context.Context, cfg *config, prefix := fmt.Sprintf("(%s)", loggerPrefix) plog := build.NewPrefixLog(prefix, log) + timeoutManager := NewTimeOutManager(plog) + g := &GoBackNConn{ cfg: cfg, recvDataChan: make(chan *PacketData, cfg.n), sendDataChan: make(chan *PacketData), - recvTimeout: DefaultRecvTimeout, - sendTimeout: DefaultSendTimeout, receivedACKSignal: make(chan struct{}), resendSignal: make(chan struct{}, 1), remoteClosed: make(chan struct{}), @@ -97,50 +96,49 @@ func newGoBackNConn(ctx context.Context, cfg *config, cancel: cancel, log: plog, quit: make(chan struct{}), + timeoutManager: timeoutManager, } - g.sendQueue = newQueue(&queueCfg{ - s: cfg.n + 1, - timeout: cfg.resendTimeout, - log: plog, - sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) + g.sendQueue = newQueue( + &queueCfg{ + s: cfg.n + 1, + log: plog, + sendPkt: func(packet *PacketData) error { + return g.sendPacket(g.ctx, packet) + }, }, - }) + timeoutManager, + ) return g } -// setN sets the current N to use. This _must_ be set before the handshake is -// completed. -func (g *GoBackNConn) setN(n uint8) { - g.cfg.n = n - g.cfg.s = n + 1 - g.recvDataChan = make(chan *PacketData, n) - g.sendQueue = newQueue(&queueCfg{ - s: n + 1, - timeout: g.cfg.resendTimeout, - log: g.log, - sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) - }, - }) -} - // SetSendTimeout sets the timeout used in the Send function. func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) { - g.timeoutsMu.Lock() - defer g.timeoutsMu.Unlock() - - g.sendTimeout = timeout + g.timeoutManager.SetSendTimeout(timeout) } // SetRecvTimeout sets the timeout used in the Recv function. func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) { - g.timeoutsMu.Lock() - defer g.timeoutsMu.Unlock() + g.timeoutManager.SetRecvTimeout(timeout) +} - g.recvTimeout = timeout +// setN sets the current N to use. This _must_ be set before the handshake is +// completed. +func (g *GoBackNConn) setN(n uint8) { + g.cfg.n = n + g.cfg.s = n + 1 + g.recvDataChan = make(chan *PacketData, n) + g.sendQueue = newQueue( + &queueCfg{ + s: n + 1, + log: g.log, + sendPkt: func(packet *PacketData) error { + return g.sendPacket(g.ctx, packet) + }, + }, + g.timeoutManager, + ) } // Send blocks until an ack is received for the packet sent N packets before. @@ -152,9 +150,7 @@ func (g *GoBackNConn) Send(data []byte) error { default: } - g.timeoutsMu.RLock() - ticker := time.NewTimer(g.sendTimeout) - g.timeoutsMu.RUnlock() + ticker := time.NewTimer(g.timeoutManager.GetSendTimeout()) defer ticker.Stop() sendPacket := func(packet *PacketData) error { @@ -216,9 +212,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) { msg *PacketData ) - g.timeoutsMu.RLock() - ticker := time.NewTimer(g.recvTimeout) - g.timeoutsMu.RUnlock() + ticker := time.NewTimer(g.timeoutManager.GetRecvTimeout()) defer ticker.Stop() for { @@ -245,22 +239,16 @@ func (g *GoBackNConn) Recv() ([]byte, error) { func (g *GoBackNConn) start() { g.log.Debugf("Starting") - pingTime := time.Duration(math.MaxInt64) - if g.cfg.pingTime != 0 { - pingTime = g.cfg.pingTime - } - - g.pingTicker = NewIntervalAwareForceTicker(pingTime) + g.pingTicker = NewIntervalAwareForceTicker( + g.timeoutManager.GetPingTime(), + ) g.pingTicker.Resume() - pongTime := time.Duration(math.MaxInt64) - if g.cfg.pongTime != 0 { - pongTime = g.cfg.pongTime - } - - g.pongTicker = NewIntervalAwareForceTicker(pongTime) + g.pongTicker = NewIntervalAwareForceTicker( + g.timeoutManager.GetPongTime(), + ) - g.resendTicker = time.NewTicker(g.cfg.resendTimeout) + g.resendTicker = time.NewTicker(g.timeoutManager.GetResendTimeout()) g.wg.Add(1) go func() { @@ -317,7 +305,7 @@ func (g *GoBackNConn) Close() error { g.log.Tracef("Try sending FIN") ctxc, cancel := context.WithTimeout( - g.ctx, defaultFinSendTimeout, + g.ctx, g.timeoutManager.GetFinSendTimeout(), ) defer cancel() if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil { @@ -382,7 +370,7 @@ func (g *GoBackNConn) sendPacketsForever() error { // execute. That can happen if the function was awaiting the // expected ACK for a long time, or times out while awaiting the // catch up. - g.resendTicker.Reset(g.cfg.resendTimeout) + g.resendTicker.Reset(g.timeoutManager.GetResendTimeout()) // Also drain the resend signal channel, as resendTicker.Reset // doesn't drain the channel if the ticker ticked during the @@ -509,7 +497,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo g.pongTicker.Pause() } - g.resendTicker.Reset(g.cfg.resendTimeout) + g.resendTicker.Reset(g.timeoutManager.GetResendTimeout()) switch m := msg.(type) { case *PacketData: @@ -567,8 +555,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo // the resend, and therefore won't react to the // NACK we send here in time. sinceSent := time.Since(lastNackTime) - recentlySent := sinceSent < - g.cfg.resendTimeout*2 + + timeout := g.timeoutManager.GetResendTimeout() + recentlySent := sinceSent < timeout*2 if lastNackSeq == g.recvSeq && recentlySent { g.log.Tracef("Recently sent NACK") diff --git a/gbn/gbn_server.go b/gbn/gbn_server.go index 68e3b5b..5737d3b 100644 --- a/gbn/gbn_server.go +++ b/gbn/gbn_server.go @@ -143,7 +143,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo } select { - case <-time.After(g.cfg.handshakeTimeout): + case <-time.After(g.timeoutManager.GetHandshakeTimeout()): g.log.Debugf("SYNCACK resendTimeout. Abort and wait " + "for client to re-initiate") continue diff --git a/gbn/queue.go b/gbn/queue.go index 99954ba..0bbe654 100644 --- a/gbn/queue.go +++ b/gbn/queue.go @@ -17,8 +17,6 @@ type queueCfg struct { // no way to tell. s uint8 - timeout time.Duration - log btclog.Logger sendPkt func(packet *PacketData) error @@ -29,6 +27,8 @@ type queueCfg struct { type queue struct { cfg *queueCfg + timeoutManager *TimeoutManager + // content is the current content of the queue. This is always a slice // of length s but can contain nil elements if the queue isn't full. content []*PacketData @@ -59,18 +59,19 @@ type queue struct { } // newQueue creates a new queue. -func newQueue(cfg *queueCfg) *queue { +func newQueue(cfg *queueCfg, timeoutManager *TimeoutManager) *queue { if cfg.log == nil { cfg.log = log } q := &queue{ - cfg: cfg, - content: make([]*PacketData, cfg.s), - quit: make(chan struct{}), + cfg: cfg, + content: make([]*PacketData, cfg.s), + quit: make(chan struct{}), + timeoutManager: timeoutManager, } - q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit) + q.syncer = newSyncer(cfg.s, cfg.log, timeoutManager, q.quit) return q } @@ -108,7 +109,7 @@ func (q *queue) addPacket(packet *PacketData) { // two parties to be seen as synced; this may fail in which case the caller is // expected to call resend again. func (q *queue) resend() error { - if time.Since(q.lastResend) < q.cfg.timeout { + if time.Since(q.lastResend) < q.timeoutManager.GetHandshakeTimeout() { q.cfg.log.Tracef("Resent the queue recently.") return nil diff --git a/gbn/queue_test.go b/gbn/queue_test.go index be8e53f..f70b32d 100644 --- a/gbn/queue_test.go +++ b/gbn/queue_test.go @@ -10,7 +10,7 @@ import ( ) func TestQueueSize(t *testing.T) { - q := newQueue(&queueCfg{s: 4}) + q := newQueue(&queueCfg{s: 4}, NewTimeOutManager(nil)) require.Equal(t, uint8(0), q.size()) @@ -33,16 +33,18 @@ func TestQueueResend(t *testing.T) { resentPackets := make(map[uint8]struct{}) queueTimeout := time.Second * 1 + tm := NewTimeOutManager(nil) + tm.resendTimeout = queueTimeout + cfg := &queueCfg{ - s: 5, - timeout: queueTimeout, + s: 5, sendPkt: func(packet *PacketData) error { resentPackets[packet.Seq] = struct{}{} return nil }, } - q := newQueue(cfg) + q := newQueue(cfg, tm) pkt1 := &PacketData{Seq: 1} pkt2 := &PacketData{Seq: 2} diff --git a/gbn/syncer.go b/gbn/syncer.go index 2289023..2fd9dae 100644 --- a/gbn/syncer.go +++ b/gbn/syncer.go @@ -99,9 +99,9 @@ const ( // When either of the 3 conditions above are met, we will consider both parties // to be in sync. type syncer struct { - s uint8 - log btclog.Logger - timeout time.Duration + s uint8 + log btclog.Logger + timeoutManager *TimeoutManager state syncState @@ -127,20 +127,20 @@ type syncer struct { } // newSyncer creates a new syncer instance. -func newSyncer(s uint8, prefixLogger btclog.Logger, timeout time.Duration, - quit chan struct{}) *syncer { +func newSyncer(s uint8, prefixLogger btclog.Logger, + timeoutManager *TimeoutManager, quit chan struct{}) *syncer { if prefixLogger == nil { prefixLogger = log } return &syncer{ - s: s, - log: prefixLogger, - timeout: timeout, - state: syncStateIdle, - cancel: make(chan struct{}), - quit: quit, + s: s, + log: prefixLogger, + timeoutManager: timeoutManager, + state: syncStateIdle, + cancel: make(chan struct{}), + quit: quit, } } @@ -210,7 +210,9 @@ func (c *syncer) waitForSync() { case <-c.cancel: c.log.Tracef("sync canceled or reset") - case <-time.After(c.timeout * awaitingTimeoutMultiplier): + case <-time.After( + c.timeoutManager.GetResendTimeout() * awaitingTimeoutMultiplier, + ): c.log.Tracef("Timed out while waiting for sync") } @@ -291,7 +293,7 @@ func (c *syncer) proceedAfterTime() { return - case <-time.After(c.timeout): + case <-time.After(c.timeoutManager.GetResendTimeout()): c.mu.Lock() defer c.mu.Unlock() diff --git a/gbn/syncer_test.go b/gbn/syncer_test.go index eaca3ff..bcc3282 100644 --- a/gbn/syncer_test.go +++ b/gbn/syncer_test.go @@ -19,7 +19,10 @@ func TestSyncer(t *testing.T) { syncTimeout := time.Second * 1 expectedNACK := uint8(3) - syncer := newSyncer(5, nil, syncTimeout, make(chan struct{})) + tm := NewTimeOutManager(nil) + tm.resendTimeout = syncTimeout + + syncer := newSyncer(5, nil, tm, make(chan struct{})) // Let's first test the scenario where we don't receive the expected // ACK/NACK after initiating the resend. This should trigger a timeout From 17b836638f1cdde7f3b3c1e3fc2f523ac8d58306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Sat, 16 Dec 2023 17:38:07 +0100 Subject: [PATCH 03/12] gbn + mailbox: modify TimeoutManager through conf This commit changes the gbn `Config` struct to not specifically contain any timeout fields, but instead contain functional options that modifies the `TimeoutManager` struct. These options are then applied to the `TimeoutManager`. --- gbn/config.go | 30 +++++++++--------------------- gbn/gbn_conn.go | 2 +- gbn/options.go | 32 ++++---------------------------- mailbox/client_conn.go | 10 ++++++---- mailbox/server_conn.go | 10 ++++++---- 5 files changed, 26 insertions(+), 58 deletions(-) diff --git a/gbn/config.go b/gbn/config.go index 064c80d..0fc7e59 100644 --- a/gbn/config.go +++ b/gbn/config.go @@ -44,22 +44,22 @@ func WithTimeoutUpdateFrequency(frequency int) TimeoutOptions { } } -// WithTMHandshakeTimeout is used to set the timeout used during the handshake. +// WithHandshakeTimeout is used to set the timeout used during the handshake. // If the timeout is reached without response from the peer then the handshake // will be aborted and restarted. -func WithTMHandshakeTimeout(timeout time.Duration) TimeoutOptions { +func WithHandshakeTimeout(timeout time.Duration) TimeoutOptions { return func(manager *TimeoutManager) { manager.handshakeTimeout = timeout } } -// WithTMKeepalivePing is used to send a ping packet if no packets have been +// WithKeepalivePing is used to send a ping packet if no packets have been // received from the other side for the given duration. This helps keep the // connection alive and also ensures that the connection is closed if the // other side does not respond to the ping in a timely manner. After the ping // the connection will be closed if the other side does not respond within // time duration. -func WithTMKeepalivePing(ping, pong time.Duration) TimeoutOptions { +func WithKeepalivePing(ping, pong time.Duration) TimeoutOptions { return func(manager *TimeoutManager) { manager.pingTime = ping manager.pongTime = pong @@ -90,10 +90,6 @@ type config struct { // between packets. maxChunkSize int - // resendTimeout is the duration that will be waited before resending - // the packets in the current queue. - resendTimeout time.Duration - // recvFromStream is the function that will be used to acquire the next // available packet. recvFromStream recvBytesFunc @@ -106,13 +102,7 @@ type config struct { // been received and processed. onFIN func() - // handshakeTimeout is the time after which the server or client - // will abort and restart the handshake if the expected response is - // not received from the peer. - handshakeTimeout time.Duration - - pingTime time.Duration - pongTime time.Duration + timeoutOptions []TimeoutOptions } // newConfig constructs a new config struct. @@ -120,11 +110,9 @@ func newConfig(sendFunc sendBytesFunc, recvFunc recvBytesFunc, n uint8) *config { return &config{ - n: n, - s: n + 1, - recvFromStream: recvFunc, - sendToStream: sendFunc, - resendTimeout: defaultResendTimeout, - handshakeTimeout: defaultHandshakeTimeout, + n: n, + s: n + 1, + recvFromStream: recvFunc, + sendToStream: sendFunc, } } diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index 18b5183..d0453d0 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -83,7 +83,7 @@ func newGoBackNConn(ctx context.Context, cfg *config, prefix := fmt.Sprintf("(%s)", loggerPrefix) plog := build.NewPrefixLog(prefix, log) - timeoutManager := NewTimeOutManager(plog) + timeoutManager := NewTimeOutManager(plog, cfg.timeoutOptions...) g := &GoBackNConn{ cfg: cfg, diff --git a/gbn/options.go b/gbn/options.go index 4adcb3a..ca55b95 100644 --- a/gbn/options.go +++ b/gbn/options.go @@ -1,7 +1,5 @@ package gbn -import "time" - type Option func(conn *config) // WithMaxSendSize is used to set the maximum payload size in bytes per packet. @@ -14,33 +12,11 @@ func WithMaxSendSize(size int) Option { } } -// WithTimeout is used to set the resend timeout. This is the time to wait -// for ACKs before resending the queue. -func WithTimeout(timeout time.Duration) Option { - return func(conn *config) { - conn.resendTimeout = timeout - } -} - -// WithHandshakeTimeout is used to set the timeout used during the handshake. -// If the timeout is reached without response from the peer then the handshake -// will be aborted and restarted. -func WithHandshakeTimeout(timeout time.Duration) Option { - return func(conn *config) { - conn.handshakeTimeout = timeout - } -} - -// WithKeepalivePing is used to send a ping packet if no packets have been -// received from the other side for the given duration. This helps keep the -// connection alive and also ensures that the connection is closed if the -// other side does not respond to the ping in a timely manner. After the ping -// the connection will be closed if the other side does not respond within -// time duration. -func WithKeepalivePing(ping, pong time.Duration) Option { +// WithTimeoutOptions is used to set the different timeout options that will be +// used within gbn package. +func WithTimeoutOptions(opts ...TimeoutOptions) Option { return func(conn *config) { - conn.pingTime = ping - conn.pongTime = pong + conn.timeoutOptions = opts } } diff --git a/mailbox/client_conn.go b/mailbox/client_conn.go index 445daa3..7d25c24 100644 --- a/mailbox/client_conn.go +++ b/mailbox/client_conn.go @@ -166,10 +166,12 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, } c.gbnOptions = []gbn.Option{ - gbn.WithTimeout(gbnTimeout), - gbn.WithHandshakeTimeout(gbnHandshakeTimeout), - gbn.WithKeepalivePing( - gbnClientPingTimeout, gbnPongTimeout, + gbn.WithTimeoutOptions( + gbn.WithStaticResendTimeout(gbnTimeout), + gbn.WithHandshakeTimeout(gbnHandshakeTimeout), + gbn.WithKeepalivePing( + gbnClientPingTimeout, gbnPongTimeout, + ), ), gbn.WithOnFIN(func() { // We force the connection to set a new status after diff --git a/mailbox/server_conn.go b/mailbox/server_conn.go index ac75055..32e1061 100644 --- a/mailbox/server_conn.go +++ b/mailbox/server_conn.go @@ -80,10 +80,12 @@ func NewServerConn(ctx context.Context, serverHost string, cancel: cancel, quit: make(chan struct{}), gbnOptions: []gbn.Option{ - gbn.WithTimeout(gbnTimeout), - gbn.WithHandshakeTimeout(gbnHandshakeTimeout), - gbn.WithKeepalivePing( - gbnServerPingTimeout, gbnPongTimeout, + gbn.WithTimeoutOptions( + gbn.WithStaticResendTimeout(gbnTimeout), + gbn.WithHandshakeTimeout(gbnHandshakeTimeout), + gbn.WithKeepalivePing( + gbnServerPingTimeout, gbnPongTimeout, + ), ), }, status: ServerStatusNotConnected, From 5c2d3f3577b1b1442f7fbe8e905f9decfb99b6d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Sat, 16 Dec 2023 17:30:43 +0100 Subject: [PATCH 04/12] gbn: support dynamic resend timeouts This commit adds support for setting the resend timeout dynamically for a connection. Prior to this commit, the timeout before a client or server resends the queue of packets, was always set to fixed value. A fixed timeout isn't suitable for all connections though, as the latency for different clients varies. With this commit, we instead add support for setting the resend timeout based on: * How long it took for the other party to respond during the handshake process * How long it took for the other party to respond with the connect ACK for a sent data packet. The timeout is set then to the time it took for the server to respond, multiplied by the resendMultiplier, unless the duration is shorter than the default resend timeout. Note though that if a connection's resend timeout is set manually set through the WithStaticResendTimeout, the resend timeout will always be set to that value, and won't be dynamically updated. --- gbn/gbn_client.go | 9 +++++++++ gbn/gbn_conn.go | 26 +++++++++++++++++++------- gbn/gbn_server.go | 12 ++++++++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/gbn/gbn_client.go b/gbn/gbn_client.go index cab73a1..1c57c85 100644 --- a/gbn/gbn_client.go +++ b/gbn/gbn_client.go @@ -99,6 +99,7 @@ func (g *GoBackNConn) clientHandshake() error { var ( resp Message respSYN *PacketSYN + resent bool ) handshake: for { @@ -115,6 +116,9 @@ handshake: return err } + // Notify the timeout manager that we sent a SYN. + g.timeoutManager.Sent(msg, resent) + for { // Wait for SYN g.log.Debugf("Waiting for SYN") @@ -135,6 +139,7 @@ handshake: case <-time.After(timeout): g.log.Debugf("SYN resendTimeout. Resending " + "SYN.") + resent = true continue handshake case <-g.quit: @@ -173,6 +178,10 @@ handshake: return io.EOF } + // Notify the timeout manager we've received the SYN response from the + // counterparty. + g.timeoutManager.Received(resp) + // Send SYNACK g.log.Debugf("Sending SYNACK") synack, err := new(PacketSYNACK).Serialize() diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index d0453d0..a1c4431 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -104,7 +104,7 @@ func newGoBackNConn(ctx context.Context, cfg *config, s: cfg.n + 1, log: plog, sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) + return g.sendPacket(g.ctx, packet, true) }, }, timeoutManager, @@ -134,7 +134,7 @@ func (g *GoBackNConn) setN(n uint8) { s: n + 1, log: g.log, sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) + return g.sendPacket(g.ctx, packet, true) }, }, g.timeoutManager, @@ -308,7 +308,9 @@ func (g *GoBackNConn) Close() error { g.ctx, g.timeoutManager.GetFinSendTimeout(), ) defer cancel() - if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil { + + err := g.sendPacket(ctxc, &PacketFIN{}, false) + if err != nil { g.log.Errorf("Error sending FIN: %v", err) } } @@ -336,7 +338,9 @@ func (g *GoBackNConn) Close() error { } // sendPacket serializes a message and writes it to the underlying send stream. -func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error { +func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message, + isResend bool) error { + b, err := msg.Serialize() if err != nil { return fmt.Errorf("serialize error: %s", err) @@ -347,6 +351,9 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error { return fmt.Errorf("error calling sendToStream: %s", err) } + // Notify the timeout manager that a message has been sent. + g.timeoutManager.Sent(msg, isResend) + return nil } @@ -428,7 +435,7 @@ func (g *GoBackNConn) sendPacketsForever() error { g.sendQueue.addPacket(packet) g.log.Tracef("Sending data %d", packet.Seq) - if err := g.sendPacket(g.ctx, packet); err != nil { + if err := g.sendPacket(g.ctx, packet, false); err != nil { return err } @@ -490,6 +497,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo return fmt.Errorf("deserialize error: %s", err) } + // Notify the timeout manager that a message has been received. + g.timeoutManager.Received(msg) + // Reset the ping & pong timer if any packet is received. // If ping/pong is disabled, this is a no-op. g.pingTicker.Reset() @@ -514,7 +524,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo Seq: m.Seq, } - if err = g.sendPacket(g.ctx, ack); err != nil { + err = g.sendPacket(g.ctx, ack, false) + if err != nil { return err } @@ -573,7 +584,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo Seq: g.recvSeq, } - if err = g.sendPacket(g.ctx, nack); err != nil { + err = g.sendPacket(g.ctx, nack, false) + if err != nil { return err } diff --git a/gbn/gbn_server.go b/gbn/gbn_server.go index 5737d3b..0f8116d 100644 --- a/gbn/gbn_server.go +++ b/gbn/gbn_server.go @@ -82,6 +82,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo }() var n uint8 + var resent bool for { g.log.Debugf("Waiting for client SYN") @@ -131,6 +132,9 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo return err } + // Notify the timeout manager that we sent a SYN. + g.timeoutManager.Sent(msg, resent) + // Wait for SYNACK g.log.Debugf("Waiting for client SYNACK") select { @@ -146,6 +150,8 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo case <-time.After(g.timeoutManager.GetHandshakeTimeout()): g.log.Debugf("SYNCACK resendTimeout. Abort and wait " + "for client to re-initiate") + resent = true + continue case err := <-errChan: return err @@ -163,9 +169,15 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo switch msg.(type) { case *PacketSYNACK: + // Notify the timeout manager we've received the SYNACK + // response from the counterparty. + g.timeoutManager.Received(msg) + break case *PacketSYN: g.log.Debugf("Received SYN. Resend SYN.") + resent = true + goto recvClientSYN default: return io.EOF From 8b21c7640b70e361abc4ec309e6a61d334fd3082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Sat, 16 Dec 2023 17:46:50 +0100 Subject: [PATCH 05/12] mailbox: use dynamic resend timeouts for conns With this commit, the resend timeout will now be set dynamically for the connections of the gbn package. --- mailbox/client_conn.go | 13 ++++++++++++- mailbox/server_conn.go | 5 ++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/mailbox/client_conn.go b/mailbox/client_conn.go index 7d25c24..e5e11f0 100644 --- a/mailbox/client_conn.go +++ b/mailbox/client_conn.go @@ -46,6 +46,14 @@ const ( // to receive ACKS from the peer before resending the queue. gbnTimeout = 1000 * time.Millisecond + // gbnResendMultiplier is the multiplier that we want the gbn + // connection to use when dynamically setting the resend timeout. + gbnResendMultiplier = 5 + + // gbnTimeoutUpdateFrequency is the frequency representing the number of + // packages + responses we want, before we update the resend timeout. + gbnTimeoutUpdateFrequency = 200 + // gbnN is the queue size, N, that the gbn server will use. The gbn // server will send up to N packets before requiring an ACK for the // first packet in the queue. @@ -167,7 +175,10 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, c.gbnOptions = []gbn.Option{ gbn.WithTimeoutOptions( - gbn.WithStaticResendTimeout(gbnTimeout), + gbn.WithResendMultiplier(gbnResendMultiplier), + gbn.WithTimeoutUpdateFrequency( + gbnTimeoutUpdateFrequency, + ), gbn.WithHandshakeTimeout(gbnHandshakeTimeout), gbn.WithKeepalivePing( gbnClientPingTimeout, gbnPongTimeout, diff --git a/mailbox/server_conn.go b/mailbox/server_conn.go index 32e1061..f472c5b 100644 --- a/mailbox/server_conn.go +++ b/mailbox/server_conn.go @@ -81,7 +81,10 @@ func NewServerConn(ctx context.Context, serverHost string, quit: make(chan struct{}), gbnOptions: []gbn.Option{ gbn.WithTimeoutOptions( - gbn.WithStaticResendTimeout(gbnTimeout), + gbn.WithResendMultiplier(gbnResendMultiplier), + gbn.WithTimeoutUpdateFrequency( + gbnTimeoutUpdateFrequency, + ), gbn.WithHandshakeTimeout(gbnHandshakeTimeout), gbn.WithKeepalivePing( gbnServerPingTimeout, gbnPongTimeout, From d92459605749f1f6fb945d980be0abdce69174a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 31 Oct 2023 01:09:13 +0100 Subject: [PATCH 06/12] gbn: set default resend & handshake timeouts to 1s --- gbn/timeout_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gbn/timeout_manager.go b/gbn/timeout_manager.go index e74b7b6..b5ff6dc 100644 --- a/gbn/timeout_manager.go +++ b/gbn/timeout_manager.go @@ -9,9 +9,9 @@ import ( ) const ( - defaultHandshakeTimeout = 100 * time.Millisecond - defaultResendTimeout = 100 * time.Millisecond - minimumResendTimeout = 100 * time.Millisecond + defaultHandshakeTimeout = 1000 * time.Millisecond + defaultResendTimeout = 1000 * time.Millisecond + minimumResendTimeout = 1000 * time.Millisecond defaultFinSendTimeout = 1000 * time.Millisecond defaultResendMultiplier = 5 defaultTimeoutUpdateFrequency = 100 From d92c61596230786da08c1377d1d976d0e4c3e142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Mon, 4 Dec 2023 01:00:42 +0100 Subject: [PATCH 07/12] gbn: Add TimeoutBooster TimeoutBooster is a type can be used to boost the value of a timeout by a specified percentage. The timeout can be boosted multiple times, and number of boosts are then cumulative. --- gbn/timeout_manager.go | 91 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/gbn/timeout_manager.go b/gbn/timeout_manager.go index b5ff6dc..f5331bd 100644 --- a/gbn/timeout_manager.go +++ b/gbn/timeout_manager.go @@ -19,6 +19,97 @@ const ( DefaultRecvTimeout = math.MaxInt64 ) +// TimeoutBooster is used to boost a timeout by a given percentage value. +// The timeout will be boosted by the percentage value of the boostPercent any +// time the Boost function is called, and is cumulative. +type TimeoutBooster struct { + // boostPercent defines the percentage value the original timeout will + // be boosted any time the Boost function is called. + boostPercent float32 + + // boostCount defines the number of times the timeout has been boosted. + boostCount int + + // originalTimeout defines the base timeout value that is boosted. + originalTimeout time.Duration + + // withBoostFrequencyLimit is used to indicate whether there is a cap to + // how often the timeout can be boosted, which is the duration of the + // original timeout. + withBoostFrequencyLimit bool + + // lastBoost defines the time when the last boost that had any affect + // was applied. + lastBoost time.Time + + mu sync.Mutex +} + +// NewTimeoutBooster creates a new timeout booster. The originalTimeout defines +// the base timeout value that is boosted. The timeout will be boosted by the +// percentage value of the boostPercent any time the Boost function is called. +// Finally if the withBoostFrequencyLimit is set, then there is a cap to how +// often the timeout can be boosted, which is the duration of the original +// timeout. +func NewTimeoutBooster(originalTimeout time.Duration, boostPercent float32, + withBoostFrequencyLimit bool) *TimeoutBooster { + + return &TimeoutBooster{ + boostPercent: boostPercent, + originalTimeout: originalTimeout, + boostCount: 0, + withBoostFrequencyLimit: withBoostFrequencyLimit, + } +} + +// Boost boosts the timeout by the boost percent. If the withBoostFrequencyLimit +// is set, then the boost will only be applied if the duration of the original +// timeout has passed since the last boost that had any affect was applied. +func (b *TimeoutBooster) Boost() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.withBoostFrequencyLimit { + if time.Since(b.lastBoost) < b.originalTimeout { + return + } + } + + b.lastBoost = time.Now() + b.boostCount++ +} + +// Reset removes the current applied boost, and sets the original timeout to the +// passed timeout. It also restarts the frequency limit timeout if the +// withBoostFrequencyLimit was set to true when initializing the TimeoutBooster. +func (b *TimeoutBooster) Reset(newTimeout time.Duration) { + b.mu.Lock() + defer b.mu.Unlock() + + b.boostCount = 0 + b.originalTimeout = newTimeout + + // We'll also restart the frequency timeout, to ensure that any message + // we immediately resend after resetting the booster won't boost the + // timeout. + if b.withBoostFrequencyLimit { + b.lastBoost = time.Now() + } +} + +// GetCurrentTimeout returns the value of the timeout, with the boost applied. +func (b *TimeoutBooster) GetCurrentTimeout() time.Duration { + b.mu.Lock() + defer b.mu.Unlock() + + increase := time.Duration( + float32(b.originalTimeout) * b.boostPercent * + float32(b.boostCount), + ) + + return b.originalTimeout + increase +} + // TimeoutManager manages the different timeouts used by the gbn package. type TimeoutManager struct { // useStaticTimeout is used to indicate whether the resendTimeout From 73b69f0a7c508faf30ca30afa9a800c1a4a4ad80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Mon, 4 Dec 2023 01:05:20 +0100 Subject: [PATCH 08/12] gbn: add boosting of resend & handshake timeouts When we need to resend a data packet, or when we need to resend the SYN message during the handshake, due to the other side not responding within given timeout, we will boost the timeout by 50% for each time we need to resend without receiving any response. This ensures that if the original timeouts are set to a too short duration given the current network latency, we will eventually boost the timeouts to a long enough duration that allows the other side to be able to respond within the timeout. --- gbn/config.go | 12 ++++ gbn/timeout_manager.go | 70 ++++++++++++++++++- gbn/timeout_manager_test.go | 135 +++++++++++++++++++++++++++++++++++- mailbox/client_conn.go | 7 ++ mailbox/server_conn.go | 1 + 5 files changed, 221 insertions(+), 4 deletions(-) diff --git a/gbn/config.go b/gbn/config.go index 0fc7e59..1c5b68e 100644 --- a/gbn/config.go +++ b/gbn/config.go @@ -66,6 +66,18 @@ func WithKeepalivePing(ping, pong time.Duration) TimeoutOptions { } } +// WithBoostPercent is used to set the boost percent that the timeout manager +// will use to boost the resend timeout & handshake timeout every time a resend +// is required due to not receiving a response within the current timeout. +func WithBoostPercent(boostPercent float32) TimeoutOptions { + return func(manager *TimeoutManager) { + if boostPercent > 0 { + manager.resendBoostPercent = boostPercent + manager.handshakeBoostPercent = boostPercent + } + } +} + // config holds the configuration values for an instance of GoBackNConn. type config struct { // n is the window size. The sender can send a maximum of n packets diff --git a/gbn/timeout_manager.go b/gbn/timeout_manager.go index f5331bd..121eece 100644 --- a/gbn/timeout_manager.go +++ b/gbn/timeout_manager.go @@ -15,6 +15,7 @@ const ( defaultFinSendTimeout = 1000 * time.Millisecond defaultResendMultiplier = 5 defaultTimeoutUpdateFrequency = 100 + defaultBoostPercent = 0.5 DefaultSendTimeout = math.MaxInt64 DefaultRecvTimeout = math.MaxInt64 ) @@ -143,6 +144,32 @@ type TimeoutManager struct { // latestSentSYNTime field. latestSentSYNTimeMu sync.Mutex + // resendBooster is used to boost the resend timeout when we timeout + // when sending a data packet before receiving a response. The resend + // timeout will remain boosted until it is updated dynamically, as the + // timeout set during the dynamic update most accurately reflects the + // current response time. + resendBooster *TimeoutBooster + + // resendBoostPercent is the percentage value the resend timeout will be + // boosted by, any time the Boost function is called for the + // resendBooster. + resendBoostPercent float32 + + // handshakeBooster is used to boost the handshake timeout if we timeout + // when sending the SYN message before receiving the corresponding + // response. The handshake timeout will remain boosted throughout the + // lifespan of the connection if it's boosted. + // The handshake timeout is the time after which the server or client + // will abort and restart the handshake if the expected response is + // not received from the peer. + handshakeBooster *TimeoutBooster + + // handshakeBoostPercent is the percentage value the handshake timeout + // will be by boosted, any time the Boost function is called for the + // handshakeBooster. + handshakeBoostPercent float32 + // handshakeTimeout is the time after which the server or client // will abort and restart the handshake if the expected response is // not received from the peer. @@ -206,6 +233,8 @@ func NewTimeOutManager(logger btclog.Logger, log: logger, resendTimeout: defaultResendTimeout, handshakeTimeout: defaultHandshakeTimeout, + resendBoostPercent: defaultBoostPercent, + handshakeBoostPercent: defaultBoostPercent, useStaticTimeout: false, resendMultiplier: defaultResendMultiplier, finSendTimeout: defaultFinSendTimeout, @@ -219,6 +248,23 @@ func NewTimeOutManager(logger btclog.Logger, opt(m) } + // When we are resending packets, it's likely that we'll resend a range + // of packets. As we don't want every packet in that range to boost the + // resend timeout, we'll initialize the resend booster with a ticker, + // which will ensure that only the first resent packet in the range will + // boost the resend timeout. + m.resendBooster = NewTimeoutBooster( + m.resendTimeout, + m.resendBoostPercent, + true, + ) + + m.handshakeBooster = NewTimeoutBooster( + m.handshakeTimeout, + m.handshakeBoostPercent, + false, + ) + return m } @@ -256,6 +302,11 @@ func (m *TimeoutManager) Sent(msg Message, resent bool) { // if the response is for the resent SYN or the original SYN. m.latestSentSYNTime = time.Time{} + // We'll also temporarily boost the handshake timeout while + // we're resending the SYN message. This might occur multiple + // times until we receive the corresponding response. + m.handshakeBooster.Boost() + case *PacketData: m.sentTimesMu.Lock() defer m.sentTimesMu.Unlock() @@ -267,6 +318,8 @@ func (m *TimeoutManager) Sent(msg Message, resent bool) { // corresponding response. delete(m.sentTimes, msg.Seq) + m.resendBooster.Boost() + return } @@ -361,6 +414,11 @@ func (m *TimeoutManager) updateResendTimeoutUnsafe(responseTime time.Duration) { m.log.Tracef("Updating resendTimeout to %v", multipliedTimeout) m.resendTimeout = multipliedTimeout + + // Also update and reset the resend booster, as the new dynamic + // resend timeout most accurately reflects the current response + // time. + m.resendBooster.Reset(multipliedTimeout) } // GetResendTimeout returns the current resend timeout. @@ -368,7 +426,11 @@ func (m *TimeoutManager) GetResendTimeout() time.Duration { m.mu.RLock() defer m.mu.RUnlock() - return m.resendTimeout + resendTimeout := m.resendBooster.GetCurrentTimeout() + + m.log.Debugf("Returning resendTimeout %v", resendTimeout) + + return resendTimeout } // GetHandshakeTimeout returns the handshake timeout. @@ -376,7 +438,11 @@ func (m *TimeoutManager) GetHandshakeTimeout() time.Duration { m.mu.RLock() defer m.mu.RUnlock() - return m.handshakeTimeout + handshake := m.handshakeBooster.GetCurrentTimeout() + + m.log.Debugf("Returning handshakeTimeout %v", handshake) + + return handshake } // GetFinSendTimeout returns the fin send timeout. diff --git a/gbn/timeout_manager_test.go b/gbn/timeout_manager_test.go index 69876af..f200dd9 100644 --- a/gbn/timeout_manager_test.go +++ b/gbn/timeout_manager_test.go @@ -129,11 +129,22 @@ func TestSYNDynamicTimeout(t *testing.T) { require.Equal(t, minimumResendTimeout, newTimeout) // Then we'll test that the resend timeout isn't dynamically set if - // when simulating a that the SYN message has been resent. + // when simulating a that the SYN message has been resent, but that the + // handshake timeout is boosted. + tm.handshakeBooster.boostPercent = 0.2 + originalHandshakeTimeout := tm.GetHandshakeTimeout() + sendAndReceive(t, tm, synMsg, synMsg, true) unchangedResendTimeout := tm.GetResendTimeout() require.Equal(t, newTimeout, unchangedResendTimeout) + + newHandshakeTimeout := tm.GetHandshakeTimeout() + require.Equal( + t, + time.Duration(float32(originalHandshakeTimeout)*1.2), + newHandshakeTimeout, + ) } // TestDataPackageDynamicTimeout ensures that the resend timeout is dynamically @@ -187,7 +198,9 @@ func TestDataPackageDynamicTimeout(t *testing.T) { require.NotEqual(t, resendTimeout, newResendTimeout) // Finally let's test that the resend timeout isn't dynamically set when - // simulating that the data packet has been resent. + // simulating that the data packet has been resent. The resend timeout + // shouldn't be boosted either, as the resend timeout is only boosted + // if we resend a packet after the duration of the previous resend time. tm.timeoutUpdateFrequency = 1 tm.resendMultiplier = 100 @@ -197,6 +210,124 @@ func TestDataPackageDynamicTimeout(t *testing.T) { require.Equal(t, newResendTimeout, unchangedResendTimeout) } +// TestResendBooster tests that the resend timeout booster works as expected, +// and that timeout manager's resendTimeout get's boosted when we need to resend +// a packet again due to not receiving a response within the resend timeout. +func TestResendBooster(t *testing.T) { + t.Parallel() + + tm := NewTimeOutManager(nil) + setResendTimeout := time.Millisecond * 1000 + tm.resendTimeout = setResendTimeout + + initialResendTimeout := tm.GetResendTimeout() + msg := &PacketData{Seq: 20} + response := &PacketACK{Seq: 20} + + // As the resend timeout won't be dynamically set when we are resending + // packets, we'll first test that the resend timeout didn't get + // dynamically updated by a resent data packet. This will however + // boost the resend timeout, so let's initially set the boost percent + // to 0 so we can test that the resend timeout wasn't set. + tm.timeoutUpdateFrequency = 1 + tm.resendMultiplier = 1 + + tm.resendBooster.boostPercent = 0 + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, initialResendTimeout, unchangedResendTimeout) + + // Now let's change the boost percent to a non-zero value and test that + // the resend timeout was boosted as expected. + tm.resendBooster.boostPercent = 0.1 + + changedResendTimeout := tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.1), + changedResendTimeout, + ) + + // Now let's resend another packet again, which shouldn't boost the + // resend timeout again, as the duration of the previous resend timeout + // hasn't passed. + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + unchangedResendTimeout = tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.1), + unchangedResendTimeout, + ) + + // Now let's wait for the duration of the previous resend timeout and + // then resend another packet. This should boost the resend timeout + // once more, as the duration of the previous resend timeout has passed. + err := wait.Invariant(func() bool { + currentResendTimeout := tm.GetResendTimeout() + + return unchangedResendTimeout == currentResendTimeout + }, setResendTimeout) + require.NoError(t, err) + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + changedResendTimeout = tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.2), + changedResendTimeout, + ) + + // Now let's verify that in case the resend timeout is dynamically set, + // the boost of the resend timeout is reset. Note that we're not + // simulating a resend here, as that will dynamically set the resend + // timeout as the timeout update frequency is set to 1. + sendAndReceiveWithDuration( + t, tm, time.Second, msg, response, false, + ) + + newResendTimeout := tm.GetResendTimeout() + + require.NotEqual(t, changedResendTimeout, newResendTimeout) + require.Equal(t, 0, tm.resendBooster.boostCount) + + // Finally let's check that the resend timeout isn't boosted if we + // simulate a resend before the duration of the newly set resend + // timeout hasn't passed. + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + require.Equal(t, 0, tm.resendBooster.boostCount) + + // But if we wait for the duration of the newly set resend timeout and + // then simulate a resend, then the resend timeout should be boosted. + err = wait.Invariant(func() bool { + currentResendTimeout := tm.GetResendTimeout() + + return newResendTimeout == currentResendTimeout + }, newResendTimeout) + require.NoError(t, err) + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + require.Equal(t, 1, tm.resendBooster.boostCount) +} + // TestStaticTimeout ensures that the resend timeout isn't dynamically set if a // static timeout has been set. func TestStaticTimeout(t *testing.T) { diff --git a/mailbox/client_conn.go b/mailbox/client_conn.go index e5e11f0..26d8a06 100644 --- a/mailbox/client_conn.go +++ b/mailbox/client_conn.go @@ -82,6 +82,12 @@ const ( // gbnPongTimout is the time after sending the pong message that we will // timeout if we do not receive any message from our peer. gbnPongTimeout = 3 * time.Second + + // gbnBoostPercent is the percentage value that the resend and handshake + // timeout will be boosted any time we need to resend a packet due to + // the corresponding response not being received within the previous + // timeout. + gbnBoostPercent = 0.5 ) // ClientStatus is a description of the connection status of the client. @@ -183,6 +189,7 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, gbn.WithKeepalivePing( gbnClientPingTimeout, gbnPongTimeout, ), + gbn.WithBoostPercent(gbnBoostPercent), ), gbn.WithOnFIN(func() { // We force the connection to set a new status after diff --git a/mailbox/server_conn.go b/mailbox/server_conn.go index f472c5b..75d90c2 100644 --- a/mailbox/server_conn.go +++ b/mailbox/server_conn.go @@ -89,6 +89,7 @@ func NewServerConn(ctx context.Context, serverHost string, gbn.WithKeepalivePing( gbnServerPingTimeout, gbnPongTimeout, ), + gbn.WithBoostPercent(gbnBoostPercent), ), }, status: ServerStatusNotConnected, From ea84c35da92337bd74a586e587bae20a0e6e6938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 9 Jan 2024 01:35:42 +0100 Subject: [PATCH 09/12] gbn: Ensure we always handle Pong Tick events If we have awaited a sync after sending the previous ping, both the pingTicker and pongTicker may have ticked when waiting to sync. In that case, we need to ensure that the pongTicker gets prioritized, which this commit ensures. --- gbn/gbn_conn.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index a1c4431..744462c 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -413,11 +413,26 @@ func (g *GoBackNConn) sendPacketsForever() error { continue case <-g.pingTicker.Ticks(): + // If we have expected a sync after sending the previous + // ping, both the pingTicker and pongTicker may have + // ticked when waiting to sync. In that case, we can't + // be sure which of the signals we receive over first in + // the above select. We therefore need to check if the + // pong ticker has ticked here to ensure that it get's + // prioritized over the ping ticker. + select { + case <-g.pongTicker.Ticks(): + return errKeepaliveTimeout + default: + } // Start the pong timer. g.pongTicker.Reset() g.pongTicker.Resume() + // Also reset the ping timer. + g.pingTicker.Reset() + g.log.Tracef("Sending a PING packet") packet = &PacketData{ From b46131397c66b3445517aaf2ef8f8e3f53f90d8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 9 Jan 2024 01:47:36 +0100 Subject: [PATCH 10/12] gbn: complete handshake if client has completed it This fixes a bug where the server would not complete the handshake if the server restarted the handshake after sending it's SYN, which the client then received and then completed the handshake. This could previously happen if the server timedout when waiting for the clients SYN response, or if the client's SYNACK was lost due to packet loss. --- gbn/gbn_server.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/gbn/gbn_server.go b/gbn/gbn_server.go index 0f8116d..dc53ca9 100644 --- a/gbn/gbn_server.go +++ b/gbn/gbn_server.go @@ -84,6 +84,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo var n uint8 var resent bool +handshakeLoop: for { g.log.Debugf("Waiting for client SYN") select { @@ -111,6 +112,24 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo switch msg.(type) { case *PacketSYN: + + case *PacketSYNACK, *PacketData: + // If we receive a SYNACK or DATA packet after we have + // restarted the handshake, we can be sure that the + // client has received our SYN and has completed the + // handshake. We can therefore complete the handshake + // ourselves. + if resent { + g.log.Tracef("Received %T after restarting "+ + "handshake", msg) + g.timeoutManager.Received(msg) + + break handshakeLoop + } + + g.log.Tracef("Expected SYN, got %T", msg) + + continue default: g.log.Tracef("Expected SYN, got %T", msg) continue @@ -169,6 +188,8 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo switch msg.(type) { case *PacketSYNACK: + g.log.Debugf("Received SYNACK") + // Notify the timeout manager we've received the SYNACK // response from the counterparty. g.timeoutManager.Received(msg) @@ -185,8 +206,6 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo break } - g.log.Debugf("Received SYNACK") - // Set all variables that are dependent on the value of N that we get // from the client g.setN(n) From da8143f8d85d8bea7321da6e2123191cf716099a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 9 Jan 2024 01:57:43 +0100 Subject: [PATCH 11/12] itest: close context after closing conn for client --- itest/client_harness.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/itest/client_harness.go b/itest/client_harness.go index 004b084..b8a19eb 100644 --- a/itest/client_harness.go +++ b/itest/client_harness.go @@ -119,6 +119,10 @@ func (c *clientHarness) start() error { } func (c *clientHarness) cleanup() error { - c.cancel() + // We cancel the context after closing the connection, as it's used + // during the connection closing process. We defer the cancel to make + // sure it's always canceled. + defer c.cancel() + return c.grpcConn.Close() } From 931a7c3d6e7f06a829d66b4aa7806498277cec93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tigerstr=C3=B6m?= Date: Tue, 9 Jan 2024 02:27:54 +0100 Subject: [PATCH 12/12] itest: increase test timeout to 60secs --- itest/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/itest/connection_test.go b/itest/connection_test.go index 4ca048d..bba3657 100644 --- a/itest/connection_test.go +++ b/itest/connection_test.go @@ -15,7 +15,7 @@ var ( defaultMessage = []byte("some default message") ) -const defaultTimeout = 30 * time.Second +const defaultTimeout = 60 * time.Second // testHappyPath ensures that client and server are able to communicate // as expected in the case where no connections are dropped.