Skip to content

Commit

Permalink
Pipeline & Transaction with failover to multi cluster (#3602)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Nov 15, 2023
1 parent 452ea52 commit 0d77298
Show file tree
Hide file tree
Showing 21 changed files with 5,204 additions and 8,774 deletions.
26 changes: 26 additions & 0 deletions src/main/java/redis/clients/jedis/AbstractPipeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package redis.clients.jedis;

import java.io.Closeable;

public abstract class AbstractPipeline extends PipeliningBase implements Closeable {

protected AbstractPipeline(CommandObjects commandObjects) {
super(commandObjects);
}

@Override
public abstract void close();

/**
* Synchronize pipeline by reading all responses.
*/
public abstract void sync();

public Response<Long> publish(String channel, String message) {
return appendCommand(commandObjects.publish(channel, message));
}

public Response<Long> publish(byte[] channel, byte[] message) {
return appendCommand(commandObjects.publish(channel, message));
}
}
35 changes: 35 additions & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.List;

public abstract class AbstractTransaction extends PipeliningBase implements Closeable {

protected AbstractTransaction() {
super(new CommandObjects());
}

public abstract void multi();

/**
* Must be called before {@link AbstractTransaction#multi() MULTI}.
*/
public abstract String watch(final String... keys);

/**
* Must be called before {@link AbstractTransaction#multi() MULTI}.
*/
public abstract String watch(final byte[]... keys);

public abstract String unwatch();

@Override public abstract void close();

public abstract List<Object> exec();

public abstract String discard();

public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}
}
4 changes: 3 additions & 1 deletion src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public class CommandObjects {

private RedisProtocol protocol;

protected void setProtocol(RedisProtocol proto) {
// TODO: restrict?
public final void setProtocol(RedisProtocol proto) {
this.protocol = proto;
}

// TODO: remove?
protected RedisProtocol getProtocol() {
return protocol;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public String toString() {
return "Connection{" + socketFactory + "}";
}

final RedisProtocol getRedisProtocol() {
public final RedisProtocol getRedisProtocol() {
return protocol;
}

Expand Down
7 changes: 1 addition & 6 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand All @@ -14,16 +13,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.commands.PipelineBinaryCommands;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.RedisModulePipelineCommands;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.graph.GraphCommandObjects;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.util.IOUtils;

public abstract class MultiNodePipelineBase extends PipelineBase
implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable {
public abstract class MultiNodePipelineBase extends PipelineBase {

private final Logger log = LoggerFactory.getLogger(getClass());

Expand Down
14 changes: 5 additions & 9 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@
import java.util.Queue;

import redis.clients.jedis.commands.DatabasePipelineCommands;
import redis.clients.jedis.commands.PipelineBinaryCommands;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.RedisModulePipelineCommands;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.graph.GraphCommandObjects;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.KeyValue;

public class Pipeline extends PipelineBase implements PipelineCommands, PipelineBinaryCommands,
DatabasePipelineCommands, RedisModulePipelineCommands, Closeable {
public class Pipeline extends PipelineBase implements DatabasePipelineCommands, Closeable {

private final Queue<Response<?>> pipelinedResponses = new LinkedList<>();
protected final Connection connection;
Expand Down Expand Up @@ -66,8 +62,8 @@ public void close() {
public void sync() {
if (!hasPipelinedResponse()) return;
List<Object> unformatted = connection.getMany(pipelinedResponses.size());
for (Object o : unformatted) {
pipelinedResponses.poll().set(o);
for (Object rawReply : unformatted) {
pipelinedResponses.poll().set(rawReply);
}
}

Expand All @@ -81,10 +77,10 @@ public List<Object> syncAndReturnAll() {
if (hasPipelinedResponse()) {
List<Object> unformatted = connection.getMany(pipelinedResponses.size());
List<Object> formatted = new ArrayList<>();
for (Object o : unformatted) {
for (Object rawReply : unformatted) {
try {
Response<?> response = pipelinedResponses.poll();
response.set(o);
response.set(rawReply);
formatted.add(response.get());
} catch (JedisDataException e) {
formatted.add(e);
Expand Down
Loading

0 comments on commit 0d77298

Please sign in to comment.