Skip to content

Commit

Permalink
gbn: make resend timeout dynamic
Browse files Browse the repository at this point in the history
Prior to this commit, the timeout before a client resends the queue
of packets was always a fixed value. This fixed timeout isn't
suitable for all clients as the latency for different clients varies.

With this commit, we instead set the resend timeout based on how long
it took for the other party to respond during the handshake process.
The timeout is set to the time it took for the server to respond
multiplied by the resendMultiplier, unless the duration is shorter than
the default resend timeout. If the the resend timeout has been manually
set, the resend timeout will always be set to that value, and won't be
dynamically set.
  • Loading branch information
ViktorTigerstrom committed Nov 15, 2023
1 parent c21b195 commit 267ba3c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 3 deletions.
7 changes: 7 additions & 0 deletions gbn/gbn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ handshake:
return err
}

// Notify the timeout manager that we sent a SYN.
g.timeoutManager.Sent(msg)

for {
// Wait for SYN
log.Debugf("Client waiting for SYN")
Expand Down Expand Up @@ -161,6 +164,10 @@ handshake:
return io.EOF
}

// Notify the timeout manager we've received the SYN response from the
// counterparty.
g.timeoutManager.Received(resp)

// Send SYNACK
log.Debugf("Client sending SYNACK")
synack, err := new(PacketSYNACK).Serialize()
Expand Down
3 changes: 3 additions & 0 deletions gbn/gbn_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
return fmt.Errorf("error calling sendToStream: %s", err)
}

// Notify the timeout manager that a message has been sent.
g.timeoutManager.Sent(msg)

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions gbn/gbn_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo
return err
}

// Notify the timeout manager that we sent a SYN.
g.timeoutManager.Sent(msg)

// Wait for SYNACK
log.Debugf("Waiting for client SYNACK")
select {
Expand Down Expand Up @@ -160,6 +163,10 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo

switch msg.(type) {
case *PacketSYNACK:
// Notify the timeout manager we've received the SYNACK
// response from the counterparty.
g.timeoutManager.Received(msg)

break
case *PacketSYN:
log.Debugf("Received SYN. Resend SYN.")
Expand Down
13 changes: 12 additions & 1 deletion gbn/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@ func WithMaxSendSize(size int) Option {
// for ACKs before resending the queue.
func WithTimeout(timeout time.Duration) Option {
return func(conn *GoBackNConn) {
conn.timeoutManager.SetResendTimeout(timeout)
conn.timeoutManager.SetStaticResendTimeout(timeout)
}
}

// WithResendMultiplier is used to set the resend multiplier. This is the
// multiplier we use when dynamically setting the resend timeout, based on how
// long it took for other party to respond during the handshake.
// Note that when setting the resend timeout manually with the WithTimeout
// option, this option will have no effect.
func WithResendMultiplier(multiplier int) Option {
return func(conn *GoBackNConn) {
conn.timeoutManager.SetResendMultiplier(multiplier)
}
}

Expand Down
88 changes: 86 additions & 2 deletions gbn/timeout_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
defaultHandshakeTimeout = 100 * time.Millisecond
defaultResendTimeout = 100 * time.Millisecond
finSendTimeout = 1000 * time.Millisecond
defaultResendMultiplier = 5
DefaultSendTimeout = math.MaxInt64
DefaultRecvTimeout = math.MaxInt64
)
Expand All @@ -19,6 +20,10 @@ type TimeoutManager struct {
// by the server or the client.
isServer bool

// useStaticTimeout is used to indicate whether the resendTimeout
// has been manually set, and if so, should not be updated dynamically.
useStaticTimeout bool

// resendTimeout defines the current resend timeout used by the
// timeout manager.
// The resend timeout is the duration that will be waited before
Expand All @@ -29,6 +34,12 @@ type TimeoutManager struct {
resendTimeout time.Duration
resendTimeoutMu sync.RWMutex

// resendMultiplier defines the multiplier used when multiplying the
// duration it took for the other party to respond when setting the
// resendTimeout dynamically during the handshake.
resendMultiplier int
resendMultiplierMu sync.RWMutex

// latestSentSYNTime is used to keep track of the time when the latest
// SYN message was sent. This is used to dynamically set the resend
// timeout, based on how long it took for the other party to respond to
Expand Down Expand Up @@ -62,11 +73,74 @@ func NewTimeOutManager(isServer bool) *TimeoutManager {
isServer: isServer,
resendTimeout: defaultResendTimeout,
handshakeTimeout: defaultHandshakeTimeout,
useStaticTimeout: false,
resendMultiplier: defaultResendMultiplier,
recvTimeout: DefaultRecvTimeout,
sendTimeout: DefaultSendTimeout,
}
}

// Sent is should be called when a message is sent by the connection.
func (m *TimeoutManager) Sent(msg Message) {
// TODO: In the future, we may want to use this to keep track of the
// time it took for the other party to respond to other types of
// messages than the handshake, and dynamically keep updating the resend
// timeout to ensure that it reflects the current response time.
switch msg.(type) { //nolint:gocritic
case *PacketSYN:
// Note that we may send multiple SYN messages before receiving
// a response. Therefore, this field might be updated multiple
// times.
m.latestSentSYNTime = time.Now()
}
}

// Received is should be called when a message is received by the connection.
func (m *TimeoutManager) Received(msg Message) {
// TODO: In the future, we may want to use this to keep track of the
// time it took for the other party to respond to other types of
// messages than the handshake, and dynamically keep updating the resend
// timeout to ensure that it reflects the current response time.
switch msg.(type) {
case *PacketSYN:
if !m.isServer {
m.updateResendTimeout(time.Since(m.latestSentSYNTime))
}

case *PacketSYNACK:
if m.isServer {
m.updateResendTimeout(time.Since(m.latestSentSYNTime))
}
}
}

// updateResendTimeout updates the resend timeout based on the given response
// time. The resend timeout will be only be updated if the given response time
// is greater than the default resend timeout, after being multiplied by the
// resendMultiplier.
// If a static timeout has been manually set, then this function will do be a
// no-op.
func (m *TimeoutManager) updateResendTimeout(responseTime time.Duration) {
if m.useStaticTimeout {
log.Tracef("Not increasing resendTimeout as it has been set " +
"manually")

return
}

multipliedTimeout := time.Duration(m.resendMultiplier) * responseTime

if multipliedTimeout > defaultResendTimeout {
log.Tracef("Updating resendTimeout to %v", multipliedTimeout)

m.resendTimeout = multipliedTimeout
} else {
log.Tracef("Not updating resendTimeout to %v as it is not "+
"greater than the minimum resendTimeout which is %v",
multipliedTimeout, m.resendTimeout)
}
}

// GetResendTimeout returns the current resend timeout.
func (m *TimeoutManager) GetResendTimeout() time.Duration {
m.resendTimeoutMu.RLock()
Expand Down Expand Up @@ -107,14 +181,24 @@ func (m *TimeoutManager) GetRecvTimeout() time.Duration {
return m.recvTimeout
}

// SetResendTimeout sets the resend timeout to the given value, and also
// SetStaticResendTimeout sets the resend timeout to the given value, and also
// marks the timeout manager as using a static resend timeout, which will mean
// that the resend timeout will not be updated dynamically.
func (m *TimeoutManager) SetResendTimeout(resendTimeout time.Duration) {
func (m *TimeoutManager) SetStaticResendTimeout(resendTimeout time.Duration) {
m.resendTimeoutMu.Lock()
defer m.resendTimeoutMu.Unlock()

m.resendTimeout = resendTimeout
m.useStaticTimeout = true
}

// SetResendMultiplier sets the resend multiplier used when dynamically
// setting the resend timeout.
func (m *TimeoutManager) SetResendMultiplier(resendMultiplier int) {
m.resendMultiplierMu.Lock()
defer m.resendMultiplierMu.Unlock()

m.resendMultiplier = resendMultiplier
}

// SetHandshakeTimeout sets the handshake timeout.
Expand Down

0 comments on commit 267ba3c

Please sign in to comment.