From 29d38e1dbec0e1ef7abbb3fe48fca554dc879c4c Mon Sep 17 00:00:00 2001 From: rishtigupta <127137312+rishtigupta@users.noreply.github.com> Date: Wed, 13 Nov 2024 11:48:19 -0800 Subject: [PATCH] feat: use retry interceptor for streaming calls (#400) --- .../momento/sdk/RetryClientInterceptor.java | 27 ++++++++--------- .../DefaultRetryEligibilityStrategy.java | 1 + ...lientCall.java => RetryingClientCall.java} | 30 ++++++++----------- 3 files changed, 27 insertions(+), 31 deletions(-) rename momento-sdk/src/main/java/momento/sdk/retry/{RetryingUnaryClientCall.java => RetryingClientCall.java} (67%) diff --git a/momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java b/momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java index 497c75ad..07a366fb 100644 --- a/momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java +++ b/momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java @@ -18,26 +18,27 @@ import javax.annotation.Nullable; import momento.sdk.retry.RetryEligibilityStrategy; import momento.sdk.retry.RetryStrategy; -import momento.sdk.retry.RetryingUnaryClientCall; +import momento.sdk.retry.RetryingClientCall; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Interceptor for retrying client calls with gRPC servers. This interceptor is responsible for - * handling retry logic when making unary (single request, single response) gRPC calls. + * handling retry logic when making unary (single request, single response) and streaming gRPC + * calls. * *

A {@link ClientCall} is essentially an instance of a gRPC invoker. Every gRPC interceptor * expects us to return such client call(s) that it will execute in order. Each call has a "start" * method, which is the entry point for the call. * - *

This retry client interceptor returns an instance of a {@link RetryingUnaryClientCall}, which - * is a client call designed to handle retrying unary (single request, single response) operations. - * The interceptor uses a provided {@link RetryStrategy} to determine when and how to retry failed - * calls. + *

This retry client interceptor returns an instance of a {@link RetryingClientCall}, which is a + * client call designed to handle retrying unary (single request, single response) and streaming + * call operations. The interceptor uses a provided {@link RetryStrategy} to determine when and how + * to retry failed calls. * *

When a gRPC call is intercepted, the interceptor checks whether the method is unary (client - * sends one message), and if so, it wraps the original {@link ClientCall} with the {@link - * RetryingUnaryClientCall}. This custom call is responsible for handling the retry logic. + * sends one message) or streaming, and if so, it wraps the original {@link ClientCall} with the + * {@link RetryingClientCall}. This custom call is responsible for handling the retry logic. * *

When the gRPC call is closed, the {@code onClose} method is called, which is the point where * we can safely check the status of the initial request that was made and determine if we want to @@ -48,8 +49,6 @@ * we should not retry anymore), the interceptor propagates the final result to the original * listener, effectively completing the call with the last status received. * - *

Note that the interceptor only supports unary operations for retrying. - * * @see RetryStrategy * @see RetryEligibilityStrategy * @param The type of the request message. @@ -76,12 +75,12 @@ public ClientCall interceptCall( final MethodDescriptor method, final CallOptions callOptions, final Channel channel) { - // currently the SDK only supports unary operations which we want to retry on + if (!method.getType().clientSendsOneMessage()) { return channel.newCall(method, callOptions); } - return new RetryingUnaryClientCall(channel.newCall(method, callOptions)) { + return new RetryingClientCall(channel.newCall(method, callOptions)) { private int attemptNumber = 0; @Nullable private Future future = null; @@ -90,7 +89,6 @@ public void start(Listener responseListener, Metadata headers) { super.start( new ForwardingClientCallListener.SimpleForwardingClientCallListener( responseListener) { - /** * At this point, the ClientCall has been closed. Any additional calls to the * ClientCall will not be processed by the server. The server does not send any @@ -126,12 +124,13 @@ public void onClose(Status status, Metadata trailers) { // a delay not present indicates we have exhausted retries or exceeded // delay or any variable the strategy author wishes to not retry anymore if (!retryDelay.isPresent()) { + cancelAttempt(); super.onClose(status, trailers); return; } logger.debug( - "Retrying request {} on error code {} with delay {} millisecodns", + "Retrying request {} on error code {} with delay {} milliseconds", method.getFullMethodName(), status.getCode().toString(), retryDelay.get().toMillis()); diff --git a/momento-sdk/src/main/java/momento/sdk/retry/DefaultRetryEligibilityStrategy.java b/momento-sdk/src/main/java/momento/sdk/retry/DefaultRetryEligibilityStrategy.java index ec01125f..6b8047f1 100644 --- a/momento-sdk/src/main/java/momento/sdk/retry/DefaultRetryEligibilityStrategy.java +++ b/momento-sdk/src/main/java/momento/sdk/retry/DefaultRetryEligibilityStrategy.java @@ -57,6 +57,7 @@ public class DefaultRetryEligibilityStrategy implements RetryEligibilityStrategy add("cache_client.Scs/ListLength"); // not idempotent: "/cache_client.Scs/ListConcatenateFront", // not idempotent: "/cache_client.Scs/ListConcatenateBack" + add("cache_client.Scs/GetBatch"); } }; diff --git a/momento-sdk/src/main/java/momento/sdk/retry/RetryingUnaryClientCall.java b/momento-sdk/src/main/java/momento/sdk/retry/RetryingClientCall.java similarity index 67% rename from momento-sdk/src/main/java/momento/sdk/retry/RetryingUnaryClientCall.java rename to momento-sdk/src/main/java/momento/sdk/retry/RetryingClientCall.java index 14f4019b..0c2135a0 100644 --- a/momento-sdk/src/main/java/momento/sdk/retry/RetryingUnaryClientCall.java +++ b/momento-sdk/src/main/java/momento/sdk/retry/RetryingClientCall.java @@ -7,24 +7,20 @@ /** * A custom implementation of {@link ClientCall} that handles retrying unary (single request, single - * response) operations. This class is used by the RetryClientInterceptor to manage retry logic for - * failed gRPC calls. + * response) and streaming call operations. This class is used by the RetryClientInterceptor to + * manage retry logic for failed gRPC calls. * - *

The {@code RetryingUnaryClientCall} wraps an original {@link ClientCall} and intercepts the - * methods related to starting, sending messages, and handling the response. If the original call - * encounters an error, the interceptor schedules a retry attempt based on the configured retry - * strategy and eligibility rules. + *

The {@code RetryingClientCall} wraps an original {@link ClientCall} and intercepts the methods + * related to starting, sending messages, and handling the response. If the original call encounters + * an error, the interceptor schedules a retry attempt based on the configured retry strategy and + * eligibility rules. * - *

Each instance of {@code RetryingUnaryClientCall} maintains its own state, including the - * request message, response listener, headers, and other properties specific to the call. When a - * retry is needed, a new instance of this class is created with the original request details, and - * the retry attempt is initiated. - * - *

Note that this implementation assumes that the gRPC call is unary, meaning the client sends - * one message and receives one response. For streaming calls or other call types, a different - * approach or implementation would be required. + *

Each instance of {@code RetryingClientCall} maintains its own state, including the request + * message, response listener, headers, and other properties specific to the call. When a retry is + * needed, a new instance of this class is created with the original request details, and the retry + * attempt is initiated. */ -public class RetryingUnaryClientCall extends ClientCall { +public class RetryingClientCall extends ClientCall { private ClientCall delegate; private Listener responseListener; @@ -34,12 +30,12 @@ public class RetryingUnaryClientCall extends ClientCall delegate) { + public RetryingClientCall(final ClientCall delegate) { this.delegate = delegate; }