From b573295412274794d85d5eb600299b0edd044e58 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 23 Oct 2023 16:22:47 +0600 Subject: [PATCH] Pipeline with failover to multi cluster --- .../jedis/MultiClusterFailoverPipeline.java | 140 ++++++++++++++++++ .../redis/clients/jedis/UnifiedJedis.java | 5 +- .../CircuitBreakerFailoverBase.java | 67 +++++++++ ...cuitBreakerFailoverConnectionProvider.java | 55 +++++++ .../CircuitBreakerCommandExecutor.java | 52 ++----- 5 files changed, 275 insertions(+), 44 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/MultiClusterFailoverPipeline.java create mode 100644 src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverBase.java create mode 100644 src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverConnectionProvider.java diff --git a/src/main/java/redis/clients/jedis/MultiClusterFailoverPipeline.java b/src/main/java/redis/clients/jedis/MultiClusterFailoverPipeline.java new file mode 100644 index 0000000000..0fa8e66d3e --- /dev/null +++ b/src/main/java/redis/clients/jedis/MultiClusterFailoverPipeline.java @@ -0,0 +1,140 @@ +package redis.clients.jedis; + +import java.io.Closeable; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import redis.clients.jedis.activeactive.CircuitBreakerFailoverConnectionProvider; +import redis.clients.jedis.commands.PipelineBinaryCommands; +import redis.clients.jedis.commands.PipelineCommands; +import redis.clients.jedis.commands.RedisModulePipelineCommands; +import redis.clients.jedis.graph.ResultSet; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.util.KeyValue; + +/** + * This is high memory dependent solution as all the appending commands will be hold in memory until + * {@link MultiClusterFailoverPipeline#sync() SYNC} + * (or {@link MultiClusterFailoverPipeline#close() CLOSE}) gets called. + */ +public class MultiClusterFailoverPipeline extends PipelineBase + implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable { + + private final CircuitBreakerFailoverConnectionProvider provider; + private final Queue>> commands = new LinkedList<>(); + + public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provider) { + super(new CommandObjects()); + try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now + RedisProtocol proto = connection.getRedisProtocol(); + if (proto != null) this.commandObjects.setProtocol(proto); + } + + this.provider = new CircuitBreakerFailoverConnectionProvider(provider); + } + + @Override + public final Response appendCommand(CommandObject commandObject) { + CommandArguments args = commandObject.getArguments(); + Response response = new Response<>(commandObject.getBuilder()); + commands.add(KeyValue.of(args, response)); + return response; + } + + @Override + public void close() { + sync(); + } + + /** + * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to + * get return values from pipelined commands, capture the different Response<?> of the + * commands you execute. + */ + @Override + public void sync() { + if (!hasPipelinedResponse()) return; + + try (Connection connection = provider.getConnection()) { + for (KeyValue> command : commands) { + connection.sendCommand(command.getKey()); + } + // connection.flush(); // following flushes anyway + + List unformatted = connection.getMany(commands.size()); + for (Object o : unformatted) { + commands.poll().getValue().set(o); + } + } + } + + public final boolean hasPipelinedResponse() { + return commands.size() > 0; + } + + public Response waitReplicas(int replicas, long timeout) { + return appendCommand(commandObjects.waitReplicas(replicas, timeout)); + } + + public Response> waitAOF(long numLocal, long numReplicas, long timeout) { + return appendCommand(commandObjects.waitAOF(numLocal, numReplicas, timeout)); + } + + public Response> time() { + return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.TIME), BuilderFactory.STRING_LIST)); + } + + // RedisGraph commands + @Override + public Response graphQuery(String name, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, Map params) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, Map params) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, Map params, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, Map params, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphDelete(String name) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response> graphProfile(String graphName, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + // RedisGraph commands +} diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 2184534ca6..568393161f 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -4822,8 +4822,11 @@ public Object tFunctionCallAsync(String library, String function, List k public PipelineBase pipelined() { if (provider == null) { throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass()); + } else if (provider instanceof MultiClusterPooledConnectionProvider) { + return new MultiClusterFailoverPipeline((MultiClusterPooledConnectionProvider) provider); + } else { + return new Pipeline(provider.getConnection(), true); } - return new Pipeline(provider.getConnection(), true); } public Transaction multi() { diff --git a/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverBase.java new file mode 100644 index 0000000000..520fcfe9f2 --- /dev/null +++ b/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverBase.java @@ -0,0 +1,67 @@ +package redis.clients.jedis.activeactive; + +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; + +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.util.IOUtils; + +import java.util.Arrays; +import java.util.List; + + +/** + * @author Allen Terleto (aterleto) + *

+ * Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database + * endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active + * cluster(s) by using simple configuration which is passed through from + * Resilience4j - https://resilience4j.readme.io/docs + *

+ */ +public class CircuitBreakerFailoverBase implements AutoCloseable { + + protected final static List> defaultCircuitBreakerFallbackException = + Arrays.asList(CallNotPermittedException.class); + + protected final MultiClusterPooledConnectionProvider provider; + + public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) { + this.provider = provider; + } + + @Override + public void close() { + IOUtils.closeQuietly(this.provider); + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios + */ + protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) { + + // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent + if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { + + // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. + // To recover/transition from this forced state the user will need to manually failback + circuitBreaker.transitionToForcedOpenState(); + + // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() + // to use the next cluster's connection pool - according to the configuration's prioritization/order + int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); + + // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging + provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); + } + + // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail + else if (provider.isLastClusterCircuitBreakerForcedOpen()) { + throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + + "provided with an additional cluster/database endpoint according to its prioritized sequence. " + + "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + } + } + +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverConnectionProvider.java b/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverConnectionProvider.java new file mode 100644 index 0000000000..6cc3afd426 --- /dev/null +++ b/src/main/java/redis/clients/jedis/activeactive/CircuitBreakerFailoverConnectionProvider.java @@ -0,0 +1,55 @@ +package redis.clients.jedis.activeactive; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.decorators.Decorators; +import io.github.resilience4j.decorators.Decorators.DecorateSupplier; + +import redis.clients.jedis.Connection; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster; + +/** + * ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database endpoint. + * With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s) + * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs + */ +public class CircuitBreakerFailoverConnectionProvider extends CircuitBreakerFailoverBase { + + public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) { + super(provider); + } + + public Connection getConnection() { + Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety + + DecorateSupplier supplier = Decorators.ofSupplier(() -> this.handleGetConnection(cluster)); + + supplier.withRetry(cluster.getRetry()); + supplier.withCircuitBreaker(cluster.getCircuitBreaker()); + supplier.withFallback(defaultCircuitBreakerFallbackException, + e -> this.handleClusterFailover(cluster.getCircuitBreaker())); + + return supplier.decorate().get(); + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios + */ + private Connection handleGetConnection(Cluster cluster) { + Connection connection = cluster.getConnection(); + connection.ping(); + return connection; + } + + /** + * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios + */ + private Connection handleClusterFailover(CircuitBreaker circuitBreaker) { + + clusterFailover(circuitBreaker); + + // Recursive call to the initiating method so the operation can be retried on the next cluster connection + return getConnection(); + } + +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/executors/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/CircuitBreakerCommandExecutor.java index a01a58d98d..ac52adadd1 100644 --- a/src/main/java/redis/clients/jedis/executors/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/CircuitBreakerCommandExecutor.java @@ -1,18 +1,14 @@ package redis.clients.jedis.executors; -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.decorators.Decorators; import io.github.resilience4j.decorators.Decorators.DecorateSupplier; -import redis.clients.jedis.*; -import redis.clients.jedis.exceptions.JedisConnectionException; + +import redis.clients.jedis.CommandObject; +import redis.clients.jedis.Connection; +import redis.clients.jedis.activeactive.CircuitBreakerFailoverBase; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster; -import redis.clients.jedis.util.IOUtils; - -import java.util.Arrays; -import java.util.List; - /** * @author Allen Terleto (aterleto) @@ -22,20 +18,10 @@ * by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs *

*/ -public class CircuitBreakerCommandExecutor implements CommandExecutor { - - private final static List> circuitBreakerFallbackException = - Arrays.asList(CallNotPermittedException.class); - - private final MultiClusterPooledConnectionProvider provider; +public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor { public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) { - this.provider = provider; - } - - @Override - public void close() { - IOUtils.closeQuietly(this.provider); + super(provider); } @Override @@ -46,7 +32,7 @@ public T executeCommand(CommandObject commandObject) { supplier.withRetry(cluster.getRetry()); supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(circuitBreakerFallbackException, + supplier.withFallback(defaultCircuitBreakerFallbackException, e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); return supplier.decorate().get(); @@ -64,29 +50,9 @@ private T handleExecuteCommand(CommandObject commandObject, Cluster clust /** * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios */ - private synchronized T handleClusterFailover(CommandObject commandObject, CircuitBreaker circuitBreaker) { - - // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent - if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { - - // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. - // To recover/transition from this forced state the user will need to manually failback - circuitBreaker.transitionToForcedOpenState(); - - // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() - // to use the next cluster's connection pool - according to the configuration's prioritization/order - int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); + private T handleClusterFailover(CommandObject commandObject, CircuitBreaker circuitBreaker) { - // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging - provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); - } - - // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail - else if (provider.isLastClusterCircuitBreakerForcedOpen()) { - throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + - "provided with an additional cluster/database endpoint according to its prioritized sequence. " + - "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); - } + clusterFailover(circuitBreaker); // Recursive call to the initiating method so the operation can be retried on the next cluster connection return executeCommand(commandObject);