Skip to content

Commit

Permalink
fix: correct token bucket delays when experiencing system time jumps (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbotsf authored Feb 23, 2023
1 parent 921dec7 commit 6bab43e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
8 changes: 8 additions & 0 deletions .changes/9e04fd2c-136d-490b-9ecf-d6072a6f34c1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "9e04fd2c-136d-490b-9ecf-d6072a6f34c1",
"type": "bugfix",
"description": "Fix a bug where system time jumps could cause unexpected retry behavior",
"issues": [
"awslabs/smithy-kotlin#805"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,31 @@
package aws.smithy.kotlin.runtime.retries.delay

import aws.smithy.kotlin.runtime.retries.policy.RetryErrorType
import aws.smithy.kotlin.runtime.time.Clock
import aws.smithy.kotlin.runtime.time.epochMilliseconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.math.ceil
import kotlin.math.floor
import kotlin.math.min
import kotlin.time.ExperimentalTime
import kotlin.time.TimeSource

private const val MS_PER_S = 1_000

/**
* The standard implementation of a [RetryTokenBucket].
* @param options The configuration to use for this bucket.
* @param clock A clock to use for time calculations.
* @param timeSource A monotonic time source to use for calculating the temporal token fill of the bucket.
*/
public class StandardRetryTokenBucket(
@OptIn(ExperimentalTime::class)
public class StandardRetryTokenBucket constructor(
public val options: StandardRetryTokenBucketOptions = StandardRetryTokenBucketOptions.Default,
private val clock: Clock = Clock.System,
private val timeSource: TimeSource = TimeSource.Monotonic,
) : RetryTokenBucket {
internal var capacity = options.maxCapacity
private set

private var lastTimestamp = now()
private var lastTimeMark = timeSource.markNow()
private val mutex = Mutex()

/**
Expand Down Expand Up @@ -58,13 +59,11 @@ public class StandardRetryTokenBucket(
capacity = 0
}

lastTimestamp = now()
lastTimeMark = timeSource.markNow()
}

private fun now(): Long = clock.now().epochMilliseconds

private fun refillCapacity() {
val refillMs = now() - lastTimestamp
val refillMs = lastTimeMark.elapsedNow().inWholeMilliseconds
val refillSize = floor(options.refillUnitsPerSecond.toDouble() / MS_PER_S * refillMs).toInt()
capacity = min(options.maxCapacity, capacity + refillSize)
}
Expand All @@ -73,7 +72,7 @@ public class StandardRetryTokenBucket(
refillCapacity()

capacity = min(options.maxCapacity, capacity + size)
lastTimestamp = now()
lastTimeMark = timeSource.markNow()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
package aws.smithy.kotlin.runtime.retries.delay

import aws.smithy.kotlin.runtime.retries.policy.RetryErrorType
import aws.smithy.kotlin.runtime.time.ManualClock
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime
import kotlin.time.TestTimeSource

class StandardRetryTokenBucketTest {
companion object {
Expand Down Expand Up @@ -88,17 +88,17 @@ class StandardRetryTokenBucketTest {
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalTime::class)
@Test
fun testRefillOverTime() = runTest {
val clock = ManualClock()
val timeSource = TestTimeSource()

// A bucket that costs capacity for an initial try
val bucket = StandardRetryTokenBucket(DefaultOptions.copy(initialTryCost = 5), clock)
val bucket = StandardRetryTokenBucket(DefaultOptions.copy(initialTryCost = 5), timeSource)

assertEquals(10, bucket.capacity)
assertTime(0) { bucket.acquireToken() }
assertEquals(5, bucket.capacity)

// Refill rate is 10/s == 1/100ms so after 250ms we should have 2 more tokens.
clock.advance(250.milliseconds)
timeSource += 250.milliseconds

assertTime(0) { bucket.acquireToken() }
assertEquals(2, bucket.capacity) // We had 5, 2 refilled, and then we decremented 5 more.
Expand Down

0 comments on commit 6bab43e

Please sign in to comment.