Skip to content

Commit

Permalink
Pipeline with failover to multi cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Oct 23, 2023
1 parent d2f6712 commit b573295
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 44 deletions.
140 changes: 140 additions & 0 deletions src/main/java/redis/clients/jedis/MultiClusterFailoverPipeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import redis.clients.jedis.activeactive.CircuitBreakerFailoverConnectionProvider;
import redis.clients.jedis.commands.PipelineBinaryCommands;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.RedisModulePipelineCommands;
import redis.clients.jedis.graph.ResultSet;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.KeyValue;

/**
* This is high memory dependent solution as all the appending commands will be hold in memory until
* {@link MultiClusterFailoverPipeline#sync() SYNC}
* (or {@link MultiClusterFailoverPipeline#close() CLOSE}) gets called.
*/
public class MultiClusterFailoverPipeline extends PipelineBase
implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable {

private final CircuitBreakerFailoverConnectionProvider provider;
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();

public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provider) {
super(new CommandObjects());
try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
}

this.provider = new CircuitBreakerFailoverConnectionProvider(provider);
}

@Override
public final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Response<T> response = new Response<>(commandObject.getBuilder());
commands.add(KeyValue.of(args, response));
return response;
}

@Override
public void close() {
sync();
}

/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
* get return values from pipelined commands, capture the different Response&lt;?&gt; of the
* commands you execute.
*/
@Override
public void sync() {
if (!hasPipelinedResponse()) return;

try (Connection connection = provider.getConnection()) {
for (KeyValue<CommandArguments, Response<?>> command : commands) {
connection.sendCommand(command.getKey());
}
// connection.flush(); // following flushes anyway

List<Object> unformatted = connection.getMany(commands.size());
for (Object o : unformatted) {
commands.poll().getValue().set(o);
}
}
}

public final boolean hasPipelinedResponse() {
return commands.size() > 0;
}

public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}

public Response<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(numLocal, numReplicas, timeout));
}

public Response<List<String>> time() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.TIME), BuilderFactory.STRING_LIST));
}

// RedisGraph commands
@Override
public Response<ResultSet> graphQuery(String name, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<String> graphDelete(String name) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<List<String>> graphProfile(String graphName, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}
// RedisGraph commands
}
5 changes: 4 additions & 1 deletion src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4822,8 +4822,11 @@ public Object tFunctionCallAsync(String library, String function, List<String> k
public PipelineBase pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterFailoverPipeline((MultiClusterPooledConnectionProvider) provider);
} else {
return new Pipeline(provider.getConnection(), true);
}
return new Pipeline(provider.getConnection(), true);
}

public Transaction multi() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package redis.clients.jedis.activeactive;

import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.IOUtils;

import java.util.Arrays;
import java.util.List;


/**
* @author Allen Terleto (aterleto)
* <p>
* Base class for CommandExecutor with built-in retry, circuit-breaker, and failover to another cluster/database
* endpoint. With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active
* cluster(s) by using simple configuration which is passed through from
* Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
public class CircuitBreakerFailoverBase implements AutoCloseable {

protected final static List<Class<? extends Throwable>> defaultCircuitBreakerFallbackException =
Arrays.asList(CallNotPermittedException.class);

protected final MultiClusterPooledConnectionProvider provider;

public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
}

@Override
public void close() {
IOUtils.closeQuietly(this.provider);
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) {

// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package redis.clients.jedis.activeactive;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.github.resilience4j.decorators.Decorators.DecorateSupplier;

import redis.clients.jedis.Connection;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;

/**
* ConnectionProvider with built-in retry, circuit-breaker, and failover to another cluster/database endpoint.
* With this executor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
*/
public class CircuitBreakerFailoverConnectionProvider extends CircuitBreakerFailoverBase {

public CircuitBreakerFailoverConnectionProvider(MultiClusterPooledConnectionProvider provider) {
super(provider);
}

public Connection getConnection() {
Cluster cluster = provider.getCluster(); // Pass this by reference for thread safety

DecorateSupplier<Connection> supplier = Decorators.ofSupplier(() -> this.handleGetConnection(cluster));

supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(defaultCircuitBreakerFallbackException,
e -> this.handleClusterFailover(cluster.getCircuitBreaker()));

return supplier.decorate().get();
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private Connection handleGetConnection(Cluster cluster) {
Connection connection = cluster.getConnection();
connection.ping();
return connection;
}

/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {

clusterFailover(circuitBreaker);

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return getConnection();
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package redis.clients.jedis.executors;

import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.github.resilience4j.decorators.Decorators.DecorateSupplier;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;

import redis.clients.jedis.CommandObject;
import redis.clients.jedis.Connection;
import redis.clients.jedis.activeactive.CircuitBreakerFailoverBase;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.util.IOUtils;

import java.util.Arrays;
import java.util.List;


/**
* @author Allen Terleto (aterleto)
Expand All @@ -22,20 +18,10 @@
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
public class CircuitBreakerCommandExecutor implements CommandExecutor {

private final static List<Class<? extends Throwable>> circuitBreakerFallbackException =
Arrays.asList(CallNotPermittedException.class);

private final MultiClusterPooledConnectionProvider provider;
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {

public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
}

@Override
public void close() {
IOUtils.closeQuietly(this.provider);
super(provider);
}

@Override
Expand All @@ -46,7 +32,7 @@ public <T> T executeCommand(CommandObject<T> commandObject) {

supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(circuitBreakerFallbackException,
supplier.withFallback(defaultCircuitBreakerFallbackException,
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));

return supplier.decorate().get();
Expand All @@ -64,29 +50,9 @@ private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster clust
/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
private synchronized <T> T handleClusterFailover(CommandObject<T> commandObject, CircuitBreaker circuitBreaker) {

// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
private <T> T handleClusterFailover(CommandObject<T> commandObject, CircuitBreaker circuitBreaker) {

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
clusterFailover(circuitBreaker);

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return executeCommand(commandObject);
Expand Down

0 comments on commit b573295

Please sign in to comment.