Skip to content

Commit

Permalink
Instance-level circuit breaking, supporting smooth traffic recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jan 5, 2025
1 parent 84a9118 commit 77bde45
Show file tree
Hide file tree
Showing 48 changed files with 1,389 additions and 667 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* Utility class for performing atomic operations on {@link AtomicReference}.
*/
public class AtomicUtils {

/**
* Atomically updates the value of the given {@link AtomicReference} if the current value satisfies the specified predicate.
* If the predicate is satisfied, the new value is set using {@link AtomicReference#compareAndSet(Object, Object)}.
* If the update is successful and a success consumer is provided, the consumer is called with the old and new values.
* If the update is successful, the new value is returned. Otherwise, the old value is returned.
*
* @param reference the {@link AtomicReference} to update
* @param value the new value to set
* @param predicate a predicate that tests the current value; if null, the update is always attempted
* @param success a consumer that is called with the old and new values if the update is successful; if null, no consumer is called
* @return the new value if the update was successful, otherwise the old value
*/
public static <V> V update(AtomicReference<V> reference, V value, Predicate<V> predicate, BiConsumer<V, V> success) {
V old;
while (true) {
old = reference.get();
if (predicate == null || predicate.test(old)) {
if (reference.compareAndSet(old, value)) {
if (success != null) {
success.accept(old, value);
}
return value;
}
} else {
return old;
}
}
}

/**
* Atomically gets or updates the value associated with the specified key in the given map.
* If the key does not exist in the map, a new {@link AtomicReference} is created and associated with the key.
* The method checks if the current value satisfies the specified predicate.
* If the predicate is satisfied, the new value is set using {@link AtomicReference#compareAndSet(Object, Object)}.
* If the update is successful and a success consumer is provided, the consumer is called with the old and new values.
* If the update is successful, the new value is returned. Otherwise, the old value is returned.
*
* @param map the map containing {@link AtomicReference} values
* @param key the key whose value is to be retrieved or updated
* @param supplier a supplier that provides the new value if the predicate is satisfied or the value is null
* @param predicate a predicate that tests the current value; if null, the update is always attempted
* @param success a consumer that is called with the old and new values if the update is successful; if null, no consumer is called
* @return the new value if the update was successful, otherwise the old value
*/
public static <K, V> V getOrUpdate(Map<K, AtomicReference<V>> map, K key, Supplier<V> supplier, Predicate<V> predicate, BiConsumer<V, V> success) {
AtomicReference<V> reference = map.computeIfAbsent(key, k -> new AtomicReference<>());
V old = reference.get();
if (predicate == null && old != null || predicate != null && predicate.test(old)) {
return old;
}
return update(reference, supplier.get(), predicate == null ? null : predicate.negate(), success);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

import lombok.Getter;

/**
* Represents a time window defined by a start time and a duration.
* The end time is automatically calculated based on the start time and duration.
*/
@Getter
public class TimeWindow {

/**
* The start time of the time window.
*/
protected final long startTime;

/**
* The end time of the time window, calculated as startTime + duration.
*/
protected final long endTime;

/**
* The duration of the time window.
*/
protected final long duration;

/**
* Constructs a new TimeWindow with the specified start time and duration.
*
* @param startTime The start time of the time window.
* @param duration The duration of the time window.
*/
public TimeWindow(long startTime, long duration) {
this.startTime = startTime;
this.endTime = startTime + duration;
this.duration = duration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

import java.util.ArrayList;
import java.util.List;

/**
* A class that manages a list of TimeWindow objects.
*/
public class TimeWindowList {

/**
* The first TimeWindow added to the list.
*/
private TimeWindow window;

/**
* A list to store multiple TimeWindow objects.
*/
private List<TimeWindow> windows;

/**
* Constructs a new TimeWindowList.
*/
public TimeWindowList() {
}

/**
* Adds a TimeWindow to the list.
*
* @param window The TimeWindow to add.
*/
public void add(TimeWindow window) {
if (window != null) {
if (this.window == null) {
this.window = window;
} else if (windows == null) {
windows = new ArrayList<>();
windows.add(this.window);
windows.add(window);
} else {
windows.add(window);
}
}
}

/**
* Returns the TimeWindow with the maximum end time from the list.
* If there are multiple TimeWindows, it returns a new TimeWindow
* with the maximum start time and maximum end time from the list.
*
* @return The TimeWindow with the maximum start and end time.
*/
public TimeWindow max() {
if (windows == null) {
return window;
}
long maxStartTime = Long.MIN_VALUE;
long maxEndTime = Long.MIN_VALUE;
for (TimeWindow window : windows) {
if (window.startTime > maxStartTime) {
maxStartTime = window.startTime;
}
if (window.endTime > maxEndTime) {
maxEndTime = window.endTime;
}
}
return new TimeWindow(maxStartTime, (int) (maxEndTime - maxStartTime));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,12 @@
*/
@Getter
@Setter
public class CircuitBreakerConfig {
public class CircuitBreakerConfig extends RecyclerConfig {

/**
* The type of the circuit breaker. Default is "Resilience4j".
*/
private String type = "Resilience4j";

/**
* The interval in milliseconds at which the circuit breaker should clean up expired entries.
* Default is 30,000 milliseconds (30 seconds).
*/
private long cleanInterval = 30000;

/**
* The time in milliseconds after which the circuit breaker entries expire.
* Default is 60,000 milliseconds (60 seconds).
*/
private long expireTime = 60000;

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,12 @@
*/
@Getter
@Setter
public class ConcurrencyLimiterConfig {
public class ConcurrencyLimiterConfig extends RecyclerConfig {

/**
* The type of the concurrency limiter. Default is "Resilience4j".
*/
private String type = "Resilience4j";

/**
* The interval in milliseconds at which the concurrency limiter should clean up expired entries.
* Default is 30,000 milliseconds (30 seconds).
*/
private long cleanInterval = 30000;

/**
* The time in milliseconds after which the concurrency limiter entries expire.
* Default is 60,000 milliseconds (60 seconds).
*/
private long expireTime = 60000;

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,12 @@
*/
@Getter
@Setter
public class RateLimiterConfig {
public class RateLimiterConfig extends RecyclerConfig {

/**
* The type of the rate limiter.
*/
private String type;

/**
* The interval in milliseconds at which the rate limiter should clean up expired entries.
* Default is 30,000 milliseconds (30 seconds).
*/
private long cleanInterval = 30000;

/**
* The time in milliseconds after which the rate limiter entries expire.
* Default is 60,000 milliseconds (60 seconds).
*/
private long expireTime = 60000;

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.config;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public abstract class RecyclerConfig {
/**
* The interval in milliseconds at which the concurrency limiter should clean up expired entries.
* Default is 30,000 milliseconds (30 seconds).
*/
protected long cleanInterval = 30000;

/**
* The time in milliseconds after which the concurrency limiter entries expire.
* Default is 60,000 milliseconds (60 seconds).
*/
protected long expireTime = 60000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public abstract class AbstractEndpoint extends AbstractAttributes implements End
// Endpoint is request level
private Integer weight;

private Long recoverTime;

private Integer recoverDuration;

@Override
public String getLiveSpaceId() {
if (liveSpaceId == null) {
Expand Down Expand Up @@ -95,9 +99,29 @@ public String getLane() {
}

@Override
public Integer getWeight(ServiceRequest request) {
public Long getRecoverTime() {
return recoverTime;
}

@Override
public void setRecoverTime(Long recoverTime) {
this.recoverTime = recoverTime;
}

@Override
public Integer getRecoverDuration() {
return recoverDuration;
}

@Override
public void setRecoverDuration(Integer duration) {
this.recoverDuration = duration;
}

@Override
public Integer reweight(ServiceRequest request) {
if (weight == null) {
weight = Endpoint.super.getWeight(request);
weight = Endpoint.super.reweight(request);
}
return weight;
}
Expand Down
Loading

0 comments on commit 77bde45

Please sign in to comment.