Skip to content

Commit

Permalink
feat: use retry interceptor for streaming calls (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta authored Nov 13, 2024
1 parent 68ee067 commit 29d38e1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
27 changes: 13 additions & 14 deletions momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@
import javax.annotation.Nullable;
import momento.sdk.retry.RetryEligibilityStrategy;
import momento.sdk.retry.RetryStrategy;
import momento.sdk.retry.RetryingUnaryClientCall;
import momento.sdk.retry.RetryingClientCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Interceptor for retrying client calls with gRPC servers. This interceptor is responsible for
* handling retry logic when making unary (single request, single response) gRPC calls.
* handling retry logic when making unary (single request, single response) and streaming gRPC
* calls.
*
* <p>A {@link ClientCall} is essentially an instance of a gRPC invoker. Every gRPC interceptor
* expects us to return such client call(s) that it will execute in order. Each call has a "start"
* method, which is the entry point for the call.
*
* <p>This retry client interceptor returns an instance of a {@link RetryingUnaryClientCall}, which
* is a client call designed to handle retrying unary (single request, single response) operations.
* The interceptor uses a provided {@link RetryStrategy} to determine when and how to retry failed
* calls.
* <p>This retry client interceptor returns an instance of a {@link RetryingClientCall}, which is a
* client call designed to handle retrying unary (single request, single response) and streaming
* call operations. The interceptor uses a provided {@link RetryStrategy} to determine when and how
* to retry failed calls.
*
* <p>When a gRPC call is intercepted, the interceptor checks whether the method is unary (client
* sends one message), and if so, it wraps the original {@link ClientCall} with the {@link
* RetryingUnaryClientCall}. This custom call is responsible for handling the retry logic.
* sends one message) or streaming, and if so, it wraps the original {@link ClientCall} with the
* {@link RetryingClientCall}. This custom call is responsible for handling the retry logic.
*
* <p>When the gRPC call is closed, the {@code onClose} method is called, which is the point where
* we can safely check the status of the initial request that was made and determine if we want to
Expand All @@ -48,8 +49,6 @@
* we should not retry anymore), the interceptor propagates the final result to the original
* listener, effectively completing the call with the last status received.
*
* <p>Note that the interceptor only supports unary operations for retrying.
*
* @see RetryStrategy
* @see RetryEligibilityStrategy
* @param <ReqT> The type of the request message.
Expand All @@ -76,12 +75,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel channel) {
// currently the SDK only supports unary operations which we want to retry on

if (!method.getType().clientSendsOneMessage()) {
return channel.newCall(method, callOptions);
}

return new RetryingUnaryClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
return new RetryingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
private int attemptNumber = 0;
@Nullable private Future<?> future = null;

Expand All @@ -90,7 +89,6 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {

/**
* At this point, the ClientCall has been closed. Any additional calls to the
* ClientCall will not be processed by the server. The server does not send any
Expand Down Expand Up @@ -126,12 +124,13 @@ public void onClose(Status status, Metadata trailers) {
// a delay not present indicates we have exhausted retries or exceeded
// delay or any variable the strategy author wishes to not retry anymore
if (!retryDelay.isPresent()) {
cancelAttempt();
super.onClose(status, trailers);
return;
}

logger.debug(
"Retrying request {} on error code {} with delay {} millisecodns",
"Retrying request {} on error code {} with delay {} milliseconds",
method.getFullMethodName(),
status.getCode().toString(),
retryDelay.get().toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DefaultRetryEligibilityStrategy implements RetryEligibilityStrategy
add("cache_client.Scs/ListLength");
// not idempotent: "/cache_client.Scs/ListConcatenateFront",
// not idempotent: "/cache_client.Scs/ListConcatenateBack"
add("cache_client.Scs/GetBatch");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,20 @@

/**
* A custom implementation of {@link ClientCall} that handles retrying unary (single request, single
* response) operations. This class is used by the RetryClientInterceptor to manage retry logic for
* failed gRPC calls.
* response) and streaming call operations. This class is used by the RetryClientInterceptor to
* manage retry logic for failed gRPC calls.
*
* <p>The {@code RetryingUnaryClientCall} wraps an original {@link ClientCall} and intercepts the
* methods related to starting, sending messages, and handling the response. If the original call
* encounters an error, the interceptor schedules a retry attempt based on the configured retry
* strategy and eligibility rules.
* <p>The {@code RetryingClientCall} wraps an original {@link ClientCall} and intercepts the methods
* related to starting, sending messages, and handling the response. If the original call encounters
* an error, the interceptor schedules a retry attempt based on the configured retry strategy and
* eligibility rules.
*
* <p>Each instance of {@code RetryingUnaryClientCall} maintains its own state, including the
* request message, response listener, headers, and other properties specific to the call. When a
* retry is needed, a new instance of this class is created with the original request details, and
* the retry attempt is initiated.
*
* <p>Note that this implementation assumes that the gRPC call is unary, meaning the client sends
* one message and receives one response. For streaming calls or other call types, a different
* approach or implementation would be required.
* <p>Each instance of {@code RetryingClientCall} maintains its own state, including the request
* message, response listener, headers, and other properties specific to the call. When a retry is
* needed, a new instance of this class is created with the original request details, and the retry
* attempt is initiated.
*/
public class RetryingUnaryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
public class RetryingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {

private ClientCall<ReqT, RespT> delegate;
private Listener<RespT> responseListener;
Expand All @@ -34,12 +30,12 @@ public class RetryingUnaryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT
private boolean compressionEnabled;

/**
* Constructs a new instance of {@code RetryingUnaryClientCall} with the provided delegate.
* Constructs a new instance of {@code RetryingClientCall} with the provided delegate.
*
* @param delegate The original {@link ClientCall} to be wrapped and managed by this retrying
* call.
*/
public RetryingUnaryClientCall(final ClientCall<ReqT, RespT> delegate) {
public RetryingClientCall(final ClientCall<ReqT, RespT> delegate) {
this.delegate = delegate;
}

Expand Down

0 comments on commit 29d38e1

Please sign in to comment.