Skip to content

Commit

Permalink
dynamic backoff (PR #258)
Browse files Browse the repository at this point in the history
Add option for dynamic backoff
  • Loading branch information
MattBrittan authored Aug 11, 2024
2 parents ef148e4 + 29212b8 commit f1fe38b
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 56 deletions.
19 changes: 14 additions & 5 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ type ClientConfig struct {
CleanStartOnInitialConnection bool // Clean Start flag, if true, existing session information will be cleared on the first connection (it will be false for subsequent connections)
SessionExpiryInterval uint32 // Session Expiry Interval in seconds (if 0 the Session ends when the Network Connection is closed)

ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s)
ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s)
WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection
// Deprecated: ConnectRetryDelay is deprecated and its functionality is replaced by ReconnectBackoff.
ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s)
ReconnectBackoff func(int) time.Duration // How long to wait after failed connection attempt N (defaults to 10s)
ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s)
WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection

Queue queue.Queue // Used to queue up publish messages (if nil an error will be returned if publish could not be transmitted)

Expand Down Expand Up @@ -242,8 +244,15 @@ func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, e
if cfg.Errors == nil {
cfg.Errors = log.NOOPLogger{}
}
if cfg.ConnectRetryDelay == 0 {
cfg.ConnectRetryDelay = 10 * time.Second
if cfg.ReconnectBackoff == nil {
// for backwards compatibility we check for ConnectRetryDelay first
// before using the default constant backoff strategy (which behaves
// identically to the previous behaviour)
if cfg.ConnectRetryDelay == 0 {
cfg.ReconnectBackoff = NewConstantBackoff(10 * time.Second)
} else {
cfg.ReconnectBackoff = NewConstantBackoff(cfg.ConnectRetryDelay)
}
}
if cfg.ConnectTimeout == 0 {
cfg.ConnectTimeout = 10 * time.Second
Expand Down
40 changes: 20 additions & 20 deletions autopaho/auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func TestDisconnect(t *testing.T) {

errCh := make(chan error, 2)
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoff: NewConstantBackoff(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
ctx, cancel := context.WithCancel(ctx)
conn, done, err := ts.Connect(ctx)
Expand Down Expand Up @@ -186,10 +186,10 @@ func TestReconnect(t *testing.T) {
pinger.SetDebug(paholog.NewTestLogger(t, "pinger:"))

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoff: NewConstantBackoff(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality
Expand Down Expand Up @@ -299,10 +299,10 @@ func TestBasicPubSub(t *testing.T) {
atCount := 0

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoff: NewConstantBackoff(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount > 1 { // force failure if a reconnection is attempted (the connection should not drop in this test)
Expand Down Expand Up @@ -444,10 +444,10 @@ func TestAuthenticate(t *testing.T) {
atCount := 0

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoff: NewConstantBackoff(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestClientConfig_buildConnectPacket(t *testing.T) {
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 5,
ConnectRetryDelay: 5 * time.Second,
ReconnectBackoff: NewConstantBackoff(5 * time.Second),
ConnectTimeout: 5 * time.Second,
CleanStartOnInitialConnection: true, // Should set Clean Start flag on first connection attempt
// extends the lower-level paho.ClientConfig
Expand Down Expand Up @@ -627,9 +627,9 @@ func TestClientConfig_buildConnectPacket(t *testing.T) {
func ExampleClientConfig_ConnectPacketBuilder() {
serverURL, _ := url.Parse("mqtt://mqtt_user:[email protected]:1883")
config := ClientConfig{
ServerUrls: []*url.URL{serverURL},
ConnectRetryDelay: 5 * time.Second,
ConnectTimeout: 5 * time.Second,
ServerUrls: []*url.URL{serverURL},
ReconnectBackoff: NewConstantBackoff(5 * time.Second),
ConnectTimeout: 5 * time.Second,
ClientConfig: paho.ClientConfig{
ClientID: "test",
},
Expand Down
141 changes: 141 additions & 0 deletions autopaho/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/

package autopaho

import (
"math/rand"
"time"
)

// Backoff function to compute backoff duration for the Nth attempt
// attempt starts at "0" indicating the delay BEFORE the first attempt
type Backoff func(attempt int) time.Duration

////////////////////////////////////////////////////////////////////////////////
// implementation for constant backoff
////////////////////////////////////////////////////////////////////////////////

// Creates a new backoff with constant delay (for attempt > 0, otherwise the backoff is 0).
func NewConstantBackoff(delay time.Duration) Backoff {
return func(attempt int) time.Duration {
if attempt <= 0 {
return 0
}
return delay
}
}

////////////////////////////////////////////////////////////////////////////////
// implementation for an exponential backoff
////////////////////////////////////////////////////////////////////////////////

// NewExponentialBackoff provides a random duration within a range starting
// from a fixed min value up to a "moving" max value that increases
// exponentially for each attempt up to the specified max value.
//
// The "moving" max is computed by multiplying the initial max value with the
// factor for each attemt up the specified max value.
//
// Configuration parameters:
// - minDelay - lower bound for computed backoff
// - maxDelay - upper bound for computed backoff
// - initialMaxDelay - initial max value which wiil incerease exponentially up to the max delay
// - factor - factor for the exponential increase of initial max delay
func NewExponentialBackoff(
minDelay time.Duration, // lower bound for computed backoff
maxDelay time.Duration, // upper bound for computed backoff
initialMaxDelay time.Duration, // initial max value which wiil incerease exponentially up to the max delay
factor float32, // factor for the exponential increase of initial max delay
) Backoff {
if minDelay <= 0 {
panic("min delay must NOT be less than or equal to: 0")
}
if maxDelay <= minDelay {
panic("max delay must NOT be less than or equal to: min delay")
}
if initialMaxDelay < minDelay || maxDelay < initialMaxDelay {
panic("initial max delay must be in range of: (min, max) delay")
}
if factor <= 1 {
panic("factor must NOT be less than or equal to: 1")
}

// for simplicity using numbers instead of duration internally
minDelayMillis := minDelay.Milliseconds()
maxDelayMillis := maxDelay.Milliseconds()
initialMaxDelayMillis := initialMaxDelay.Milliseconds()

// computes the "moving" max value based on the given attempt by multiplying
// it with the factor and ensures it does not exceed the specified max value
computeMaxDelayForAttempt := func(attempt int) int64 {

// only "moving part",
// will be multiplied by "factor" up to the max value for each attempt
movingMaxMillis := initialMaxDelayMillis

// computaion is based on 1 as 0 is the backoff for the first attempt
for i := 1; i < attempt; i++ {
movingMaxMillis = int64(float32(movingMaxMillis) * factor)
// ensure we stay in range
// check for range overflow / numerical overflow
if maxDelayMillis < movingMaxMillis || movingMaxMillis < minDelayMillis {
movingMaxMillis = maxDelayMillis
// stop as we reached max value already
break
}
}

return movingMaxMillis
}

return func(attempt int) time.Duration {
if attempt <= 0 {
return 0
}

maxDelayForAttemptMillis := computeMaxDelayForAttempt(attempt)
randomMillisInRange := randRange(minDelayMillis, maxDelayForAttemptMillis)

return time.Duration(randomMillisInRange) * time.Millisecond
}
}

// DefaultExponentialBackoff returns an exponential backoff with default values.
//
// The default values are:
// - min delay: 5 seconds
// - max delay: 10 minutes
// - initial max delay: 10 seconds
// - factor: 1.5
func DefaultExponentialBackoff() Backoff {
return NewExponentialBackoff(
05*time.Second, // minDelay
10*time.Minute, // maxDelay
10*time.Second, // initialMaxDelay
1.5, // factor
)
}

////////////////////////////////////////////////////////////////////////////////
// util functions
////////////////////////////////////////////////////////////////////////////////

// Returns a random number in the range of [start, end] (inclusive)
func randRange(start int64, end int64) int64 {
normalizedRange := end - start + 1

return rand.Int63n(normalizedRange) + start
}
137 changes: 137 additions & 0 deletions autopaho/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/

// build +unittest

package autopaho

import (
"math/rand"
"testing"
"time"
)

func TestConstantBackoffNoDelay(t *testing.T) {
expected := 0 * time.Second

noDelay := NewConstantBackoff(expected)

for i := 0; i < 100; i++ {
actual := noDelay(i)
if i == 0 {
if actual != 0 {
t.Fatalf("First attempt should not have any delay")
} else {
continue
}
}
if actual != expected {
t.Fatalf("expected value: `%s`, actual `%s`", expected, actual)
}
}
}

func TestConstantBackoffRandomValue(t *testing.T) {
for j := 0; j < 10; j++ {
nonZero := rand.Intn(100) + 1
expected := time.Duration(nonZero) * time.Second

nonZeroDelay := NewConstantBackoff(expected)

for i := 0; i < 100; i++ {
actual := nonZeroDelay(i)
if i == 0 {
if actual != 0 {
t.Fatalf("First attempt should not have any delay")
} else {
continue
}
}
if actual != expected {
t.Fatalf("expected value: `%s`, actual `%s`", expected, actual)
}
}
}
}

// tests for the exponential backoff strategy implementation

func TestRandomExponentialBackoff(t *testing.T) {
for i := 0; i < 20; i++ {
doSetupAndTestRandomExponentialBackoff(t)
}
}

func doSetupAndTestRandomExponentialBackoff(t *testing.T) {
minDelayInMillisLowerBound := int64(500) // 500ms
minDelayInMillisUpperBound := minDelayInMillisLowerBound + 5*1_000 // +5s
minDelayInMillis := randRange(
minDelayInMillisLowerBound,
minDelayInMillisUpperBound,
)

minDelay := time.Duration(minDelayInMillis) * time.Millisecond

// set up a partially random initial max backoff time
initialMaxDelayInMillisLowerBound := minDelayInMillis + 500 // +500ms
initialMaxDelayInMillisUpperBound := initialMaxDelayInMillisLowerBound + 30*1_000 // +30s
initialMaxDelayInMillis := randRange(
initialMaxDelayInMillisLowerBound,
initialMaxDelayInMillisUpperBound,
)

initialMaxDelay := time.Duration(initialMaxDelayInMillis) * time.Millisecond

// set up a partially random max backoff time
maxDelayMillisLowerBound := minDelayInMillis + 30*60*1_000 // +30min
maxDelayInMillisUpperBound := maxDelayMillisLowerBound + 60*60*1_000 // +60min
maxDelayInMillis := randRange(
maxDelayMillisLowerBound,
maxDelayInMillisUpperBound,
)

maxDelay := time.Duration(maxDelayInMillis) * time.Millisecond

// set up factor for the next variation
const factor = 1.6

exponentialBackoff := NewExponentialBackoff(
minDelay,
maxDelay,
initialMaxDelay,
factor,
)

// create many backoffs and test they are within constraints
for i := 0; i < 50; i++ {
actual := exponentialBackoff(i)
if i == 0 {
if actual != 0 {
t.Fatalf("First attempt should not have any delay")
} else {
continue
}
}
if i == 0 && initialMaxDelay < actual {
t.Fatalf("Actual backoff value: `%s` was higher than configured initial maximum: `%s`", actual, initialMaxDelay)
}
if actual < minDelay {
t.Fatalf("Actual backoff value: `%s` was less than configured minimum: `%s`", actual, minDelay)
}
if maxDelay < actual {
t.Fatalf("Actual backoff value: `%s` was higher than configured maximum: `%s`", actual, maxDelay)
}
}
}
Loading

0 comments on commit f1fe38b

Please sign in to comment.