Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaky bucket issue. #194

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
public abstract class AbstractRateLimiter implements RateLimiter {

protected static final Long MICROSECOND_OF_ONE_SECOND = 1000 * 1000L;

/**
* The rate limit policy that defines the limits for the rate limiter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -29,6 +28,8 @@ public class LeakyBucketLimiter extends TokenBucketLimiter {

private static final String KEY_CAPACITY = "capacity";

private static final String KEY_FLOW_WINDOW_SECONDS = "flowWindowSeconds";

private final long capacity;

private final AtomicLong requests = new AtomicLong(0);
Expand All @@ -39,8 +40,14 @@ public LeakyBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWind
}

@Override
protected double getMaxPermits() {
return TimeUnit.SECONDS.toMicros(1L) / stableIntervalMicros;
protected double getMaxStoredPermits() {
// Keep a constant rate and prevent excessive token accumulation.
return getPermits(option.getPositive(KEY_FLOW_WINDOW_SECONDS, 1));
}

@Override
protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) {
return nowMicros + waitTime - startTime > timeoutMicros ? TIMEOUT : waitTime;
}

@Override
Expand All @@ -56,6 +63,5 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
} finally {
requests.decrementAndGet();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;

/**
* SmoothBurstyLimiter
* <p>
Expand All @@ -37,8 +35,7 @@ public SmoothBurstyLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin
}

@Override
protected double getMaxPermits() {
long maxBurstSeconds = option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS);
return TimeUnit.SECONDS.toMicros(maxBurstSeconds) / stableIntervalMicros;
protected double getMaxStoredPermits() {
return getPermits(option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;

import static java.lang.Math.min;

/**
Expand Down Expand Up @@ -53,42 +51,42 @@ public SmoothWarmupLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin

@Override
protected void initialize() {
this.warmupMicros = TimeUnit.SECONDS.toMicros(option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS));
this.thresholdPermits = 0.5 * warmupMicros / stableIntervalMicros;
this.coldIntervalMicros = stableIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR);
this.warmupMicros = option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS) * MICROSECOND_OF_ONE_SECOND;
this.thresholdPermits = 0.5 * warmupMicros / permitIntervalMicros;
this.coldIntervalMicros = permitIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR);
super.initialize();
this.slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
this.slope = (coldIntervalMicros - permitIntervalMicros) / (maxStoredPermits - thresholdPermits);
}

@Override
protected double getMaxPermits() {
return thresholdPermits + 2.0 * warmupMicros / (stableIntervalMicros + coldIntervalMicros);
protected double getMaxStoredPermits() {
return thresholdPermits + 2.0 * warmupMicros / (permitIntervalMicros + coldIntervalMicros);
}

@Override
protected long waitForStoredPermits(double storedPermits, double takePermits) {
protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, takePermits);
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, targetPermits);
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
takePermits -= permitsAboveThresholdToTake;
targetPermits -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * takePermits);
micros += (long) (permitIntervalMicros * targetPermits);
return micros;
}

@Override
protected double coolDownIntervalMicros() {
return warmupMicros / maxPermits;
return warmupMicros / maxStoredPermits;
}

private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
return permitIntervalMicros + permits * slope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,35 @@
*/
public abstract class TokenBucketLimiter extends AbstractRateLimiter {

private static final int DEFAULT_SECOND_PERMITS = 1000;
protected static final int TIMEOUT = Integer.MIN_VALUE;

protected final SleepingStopwatch stopwatch;

/**
* The maximum number of stored permits.
*/
protected double maxPermits;
protected double maxStoredPermits;

/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* The time interval (in microseconds) between each permit
*/
protected double stableIntervalMicros;

protected long nextFreeTicketMicros;

protected final Object mutex = new Object();
protected double permitIntervalMicros;

/**
* The currently stored permits.
*/
private double storedPermits;
protected double storedPermits;

protected long nextPermitMicros;

protected final Object mutex = new Object();

public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) {
super(limitPolicy, TimeUnit.MILLISECONDS);
this.stopwatch = SleepingStopwatch.createFromSystemTimer();
double secondPermits = slidingWindow.getSecondPermits();
this.stableIntervalMicros = secondPermits <= 0 ? DEFAULT_SECOND_PERMITS : TimeUnit.SECONDS.toMicros(1L) / secondPermits;
this.permitIntervalMicros = slidingWindow.getPermitIntervalMicros();
initialize();
update(stopwatch.readMicros());
refresh(stopwatch.readMicros());
}

@Override
Expand Down Expand Up @@ -90,7 +89,10 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
if (isTimeout(nowMicros, timeoutMicros)) {
return false;
}
microsToWait = computeWaitFor(permits, nowMicros);
microsToWait = estimateRequiredPermitsWaitTime(permits, nowMicros, timeoutMicros);
}
if (microsToWait == TIMEOUT) {
return false;
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
Expand All @@ -100,15 +102,15 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
* Initializes the rate limiter by setting the maximum number of permits.
*/
protected void initialize() {
this.maxPermits = getMaxPermits();
this.maxStoredPermits = getMaxStoredPermits();
}

/**
* Calculates and returns the maximum number of permits that can be accumulated.
*
* @return the maximum number of permits
*/
protected abstract double getMaxPermits();
protected abstract double getMaxStoredPermits();

/**
* Checks if the current time is before the timeout time for acquiring a free ticket.
Expand All @@ -118,7 +120,7 @@ protected void initialize() {
* @return true if the current time is before the timeout time, false otherwise
*/
protected boolean isTimeout(long nowMicros, long timeoutMicros) {
return nextFreeTicketMicros > nowMicros + timeoutMicros;
return nextPermitMicros > nowMicros + timeoutMicros;
}

/**
Expand All @@ -131,69 +133,88 @@ protected boolean isFull() {
}

/**
* Waits for the specified number of permits to become available.
* Estimates the wait time required to acquire the specified number of permits.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time waited in microseconds, or 0 if no wait was necessary
* @param permits The number of permits to acquire.
* @param startTime The request start time in microseconds.
* @param timeoutMicros The timeout time in microseconds
* @return The estimated wait time in microseconds, or 0 if no wait is required.
*/
protected long computeWaitFor(long permits, long nowMicros) {
long momentAvailable = waitForEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
protected long estimateRequiredPermitsWaitTime(long permits, long startTime, long timeoutMicros) {
// update stored permits according to the current time
long nowMicros = stopwatch.readMicros();
refresh(nowMicros);
// compute wait time
double available = min(permits, storedPermits);
double lack = permits - available;
long waitTime = estimateStorePermitsWaitTime(storedPermits, available) + (long) (lack * permitIntervalMicros);
// adjust wait time to facilitate pre-fetching
long result = adjustRequiredPermitsWaitTime(startTime, timeoutMicros, nowMicros, waitTime);
if (result == TIMEOUT) {
// it's timeout.
return TIMEOUT;
}
// update next token time
nextPermitMicros = saturatedAdd(nextPermitMicros, waitTime);
storedPermits -= available;
return result;
}

/**
* Waits for the specified number of permits to become available, returning the time at which the earliest permit will be available.
* Adjusts the required wait time for acquiring permits based on the current time and the next token time.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time at which the earliest permit will be available, in microseconds
* @param startTime The request start time in microseconds.
* @param waitTime The original wait time (in microseconds). This parameter is not used in the calculation.
* @param nowMicros The current time in microseconds
* @param timeoutMicros The timeout time in microseconds
* @return The adjusted wait time (in microseconds), which is guaranteed to be non-negative.
*/
protected long waitForEarliestAvailable(long permits, long nowMicros) {
update(nowMicros);
long returnValue = nextFreeTicketMicros;

double available = min(permits, storedPermits);
double lack = permits - available;
long waitMicros = waitForStoredPermits(storedPermits, available) + (long) (lack * stableIntervalMicros);

nextFreeTicketMicros = saturatedAdd(nextFreeTicketMicros, waitMicros);
storedPermits -= available;
return returnValue;
protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) {
return max(nextPermitMicros - startTime, 0);
}

/**
* Waits for the specified number of stored permits to become available.
*
* @param storedPermits the current number of stored permits
* @param permitsToTake the number of permits to wait for
* @param targetPermits the number of permits to wait for
* @return the time waited in microseconds
*/
protected long waitForStoredPermits(double storedPermits, double permitsToTake) {
protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) {
return 0L;
}

/**
* Returns the number of microseconds during cool down that we have to wait to get a new permit.
*/
protected double coolDownIntervalMicros() {
return stableIntervalMicros;
return permitIntervalMicros;
}

/**
* Updates the internal state of the rate limiter based on the current time.
*
* @param nowMicros the current time in microseconds
* Refresh permits based on the current time.
* @param nowMicros the current time in microseconds
*/
protected void update(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// if nextFreeTicket is in the past.
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
protected void refresh(long nowMicros) {
if (nowMicros > nextPermitMicros) {
// if nextTokenMicros is in the past.
double permits = (nowMicros - nextPermitMicros) / coolDownIntervalMicros();
permits = storedPermits + permits;
storedPermits = maxStoredPermits <= 0 ? permits : min(maxStoredPermits, permits);
nextPermitMicros = nowMicros;
}
}

/**
* Converts the given time duration (in seconds) into the equivalent number of permits.
*
* @param seconds The time duration (in seconds) for which the permits are to be calculated. Must be a positive value.
* @return The number of permits that can be acquired within the specified time duration.
*/
protected double getPermits(long seconds) {
return seconds * MICROSECOND_OF_ONE_SECOND / permitIntervalMicros;
}

/**
* Adds two long values, handling overflow by returning the maximum or minimum value.
*
Expand Down
Loading
Loading