, String> getHandshakeApplicationProtocolSelector() {
+ return actual.getHandshakeApplicationProtocolSelector();
+ }
+}
diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java
index 50d18351d7..db5a52c231 100644
--- a/src/main/java/redis/clients/jedis/UnifiedJedis.java
+++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java
@@ -19,6 +19,10 @@
import redis.clients.jedis.commands.SampleBinaryKeyedCommands;
import redis.clients.jedis.commands.SampleKeyedCommands;
import redis.clients.jedis.commands.RedisModuleCommands;
+import redis.clients.jedis.csc.Cache;
+import redis.clients.jedis.csc.CacheConfig;
+import redis.clients.jedis.csc.CacheConnection;
+import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.executors.*;
import redis.clients.jedis.gears.TFunctionListParams;
@@ -50,12 +54,14 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands,
SampleKeyedCommands, SampleBinaryKeyedCommands, RedisModuleCommands,
AutoCloseable {
+ @Deprecated
protected RedisProtocol protocol = null;
protected final ConnectionProvider provider;
protected final CommandExecutor executor;
protected final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
+ private final Cache cache;
public UnifiedJedis() {
this(new HostAndPort(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT));
@@ -85,14 +91,23 @@ public UnifiedJedis(final URI uri, JedisClientConfig config) {
.database(JedisURIHelper.getDBIndex(uri)).clientName(config.getClientName())
.protocol(JedisURIHelper.getRedisProtocol(uri))
.ssl(JedisURIHelper.isRedisSSLScheme(uri)).sslSocketFactory(config.getSslSocketFactory())
- .sslParameters(config.getSslParameters()).hostnameVerifier(config.getHostnameVerifier())
- .build());
+ .sslParameters(config.getSslParameters()).hostnameVerifier(config.getHostnameVerifier()).build());
}
public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new PooledConnectionProvider(hostAndPort, clientConfig), clientConfig.getRedisProtocol());
}
+ @Experimental
+ public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, CacheConfig cacheConfig) {
+ this(hostAndPort, clientConfig, CacheFactory.getCache(cacheConfig));
+ }
+
+ @Experimental
+ public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, Cache cache) {
+ this(new PooledConnectionProvider(hostAndPort, clientConfig, cache), clientConfig.getRedisProtocol(), cache);
+ }
+
public UnifiedJedis(ConnectionProvider provider) {
this(new DefaultCommandExecutor(provider), provider);
}
@@ -101,6 +116,11 @@ protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol);
}
+ @Experimental
+ protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol, Cache cache) {
+ this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol, cache);
+ }
+
/**
* The constructor to directly use a custom {@link JedisSocketFactory}.
*
@@ -132,13 +152,21 @@ public UnifiedJedis(Connection connection) {
this.executor = new SimpleCommandExecutor(connection);
this.commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
- if (proto != null) this.commandObjects.setProtocol(proto);
+ if (proto != null) {
+ this.commandObjects.setProtocol(proto);
+ }
this.graphCommandObjects = new GraphCommandObjects(this);
+ if (connection instanceof CacheConnection) {
+ this.cache = ((CacheConnection) connection).getCache();
+ } else {
+ this.cache = null;
+ }
}
@Deprecated
public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
- this(jedisClusterNodes, clientConfig, maxAttempts, Duration.ofMillis(maxAttempts * clientConfig.getSocketTimeoutMillis()));
+ this(jedisClusterNodes, clientConfig, maxAttempts,
+ Duration.ofMillis(maxAttempts * clientConfig.getSocketTimeoutMillis()));
}
@Deprecated
@@ -167,6 +195,13 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura
new ClusterCommandObjects(), protocol);
}
+ @Experimental
+ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
+ RedisProtocol protocol, Cache cache) {
+ this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
+ new ClusterCommandObjects(), protocol, cache);
+ }
+
/**
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
@@ -180,7 +215,8 @@ public UnifiedJedis(ShardedConnectionProvider provider) {
*/
@Deprecated
public UnifiedJedis(ShardedConnectionProvider provider, Pattern tagPattern) {
- this(new DefaultCommandExecutor(provider), provider, new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern));
+ this(new DefaultCommandExecutor(provider), provider,
+ new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern));
}
public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
@@ -216,19 +252,34 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
// Uses a fetched connection to process protocol. Should be avoided if possible.
@VisibleForTesting
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
- this(executor, provider, commandObjects, null);
+ this(executor, provider, commandObjects, null, null);
if (this.provider != null) {
try (Connection conn = this.provider.getConnection()) {
if (conn != null) {
RedisProtocol proto = conn.getRedisProtocol();
- if (proto != null) this.commandObjects.setProtocol(proto);
+ if (proto != null) {
+ this.commandObjects.setProtocol(proto);
+ }
}
- } catch (JedisException je) { }
+ } catch (JedisException je) {
+ }
}
}
+ @Experimental
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol) {
+ this(executor, provider, commandObjects, protocol, (Cache) null);
+ }
+
+ @Experimental
+ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
+ RedisProtocol protocol, Cache cache) {
+
+ if (cache != null && protocol != RedisProtocol.RESP3) {
+ throw new IllegalArgumentException("Client-side caching is only supported with RESP3.");
+ }
+
this.provider = provider;
this.executor = executor;
@@ -239,6 +290,7 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
+ this.cache = cache;
}
@Override
@@ -265,7 +317,8 @@ private T checkAndBroadcastCommand(CommandObject commandObject) {
if (broadcastAndRoundRobinConfig == null) {
} else if (commandObject.getArguments().getCommand() instanceof SearchProtocol.SearchCommand
- && broadcastAndRoundRobinConfig.getRediSearchModeInCluster() == JedisBroadcastAndRoundRobinConfig.RediSearchMode.LIGHT) {
+ && broadcastAndRoundRobinConfig
+ .getRediSearchModeInCluster() == JedisBroadcastAndRoundRobinConfig.RediSearchMode.LIGHT) {
broadcast = false;
}
@@ -277,6 +330,10 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}
+ public Cache getCache() {
+ return cache;
+ }
+
public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
@@ -3204,14 +3261,12 @@ public Map> xreadAsMap(XReadParams xReadParams, Map>> xreadGroup(String groupName, String consumer,
- XReadGroupParams xReadGroupParams, Map streams) {
+ public List>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}
@Override
- public Map> xreadGroupAsMap(String groupName, String consumer,
- XReadGroupParams xReadGroupParams, Map streams) {
+ public Map> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) {
return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}
@@ -3677,7 +3732,7 @@ public List scriptExists(List sha1s) {
@Override
public Boolean scriptExists(String sha1, String sampleKey) {
- return scriptExists(sampleKey, new String[]{sha1}).get(0);
+ return scriptExists(sampleKey, new String[] { sha1 }).get(0);
}
@Override
@@ -3687,7 +3742,7 @@ public List scriptExists(String sampleKey, String... sha1s) {
@Override
public Boolean scriptExists(byte[] sha1, byte[] sampleKey) {
- return scriptExists(sampleKey, new byte[][]{sha1}).get(0);
+ return scriptExists(sampleKey, new byte[][] { sha1 }).get(0);
}
@Override
@@ -3852,6 +3907,7 @@ public SearchResult ftSearch(String indexName, String query, FTSearchParams para
/**
* {@link FTSearchParams#limit(int, int)} will be ignored.
+ *
* @param batchSize batch size
* @param indexName index name
* @param query query
@@ -3983,7 +4039,8 @@ public Map> ftSpellCheck(String index, String query)
}
@Override
- public Map> ftSpellCheck(String index, String query, FTSpellCheckParams spellCheckParams) {
+ public Map> ftSpellCheck(String index, String query,
+ FTSpellCheckParams spellCheckParams) {
return executeCommand(commandObjects.ftSpellCheck(index, query, spellCheckParams));
}
@@ -4575,7 +4632,8 @@ public String tsCreateRule(String sourceKey, String destKey, AggregationType agg
@Override
public String tsCreateRule(String sourceKey, String destKey, AggregationType aggregationType, long bucketDuration, long alignTimestamp) {
- return executeCommand(commandObjects.tsCreateRule(sourceKey, destKey, aggregationType, bucketDuration, alignTimestamp));
+ return executeCommand(
+ commandObjects.tsCreateRule(sourceKey, destKey, aggregationType, bucketDuration, alignTimestamp));
}
@Override
@@ -4590,7 +4648,7 @@ public List tsQueryIndex(String... filters) {
@Override
public TSInfo tsInfo(String key) {
- return executor.executeCommand(commandObjects.tsInfo(key));
+ return executeCommand(commandObjects.tsInfo(key));
}
@Override
@@ -5074,7 +5132,8 @@ public Object sendCommand(byte[] sampleKey, ProtocolCommand cmd, byte[]... args)
}
public Object sendBlockingCommand(byte[] sampleKey, ProtocolCommand cmd, byte[]... args) {
- return executeCommand(commandObjects.commandArguments(cmd).addObjects((Object[]) args).blocking().processKey(sampleKey));
+ return executeCommand(
+ commandObjects.commandArguments(cmd).addObjects((Object[]) args).blocking().processKey(sampleKey));
}
public Object sendCommand(String sampleKey, ProtocolCommand cmd, String... args) {
@@ -5082,7 +5141,8 @@ public Object sendCommand(String sampleKey, ProtocolCommand cmd, String... args)
}
public Object sendBlockingCommand(String sampleKey, ProtocolCommand cmd, String... args) {
- return executeCommand(commandObjects.commandArguments(cmd).addObjects((Object[]) args).blocking().processKey(sampleKey));
+ return executeCommand(
+ commandObjects.commandArguments(cmd).addObjects((Object[]) args).blocking().processKey(sampleKey));
}
public Object executeCommand(CommandArguments args) {
diff --git a/src/main/java/redis/clients/jedis/annots/Experimental.java b/src/main/java/redis/clients/jedis/annots/Experimental.java
index e0c642e630..0d17084085 100644
--- a/src/main/java/redis/clients/jedis/annots/Experimental.java
+++ b/src/main/java/redis/clients/jedis/annots/Experimental.java
@@ -13,5 +13,5 @@
* If a type is marked with this annotation, all its members are considered experimental.
*/
@Documented
-@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
+@Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
public @interface Experimental { }
diff --git a/src/main/java/redis/clients/jedis/args/Rawable.java b/src/main/java/redis/clients/jedis/args/Rawable.java
index 7515386861..be266f58aa 100644
--- a/src/main/java/redis/clients/jedis/args/Rawable.java
+++ b/src/main/java/redis/clients/jedis/args/Rawable.java
@@ -10,4 +10,10 @@ public interface Rawable {
* @return binary
*/
byte[] getRaw();
+
+ @Override
+ int hashCode();
+
+ @Override
+ boolean equals(Object o);
}
diff --git a/src/main/java/redis/clients/jedis/args/RawableFactory.java b/src/main/java/redis/clients/jedis/args/RawableFactory.java
index 813ddd021b..4a2ec782a7 100644
--- a/src/main/java/redis/clients/jedis/args/RawableFactory.java
+++ b/src/main/java/redis/clients/jedis/args/RawableFactory.java
@@ -96,17 +96,12 @@ public int hashCode() {
/**
* A {@link Rawable} wrapping a {@link String}.
*/
- public static class RawString implements Rawable {
+ public static class RawString extends Raw {
- private final byte[] raw;
+ // TODO: private final String str; ^ implements Rawable
public RawString(String str) {
- this.raw = SafeEncoder.encode(str);
- }
-
- @Override
- public byte[] getRaw() {
- return raw;
+ super(SafeEncoder.encode(str));
}
}
diff --git a/src/main/java/redis/clients/jedis/csc/AbstractCache.java b/src/main/java/redis/clients/jedis/csc/AbstractCache.java
new file mode 100644
index 0000000000..84b4d2ef81
--- /dev/null
+++ b/src/main/java/redis/clients/jedis/csc/AbstractCache.java
@@ -0,0 +1,232 @@
+package redis.clients.jedis.csc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import redis.clients.jedis.annots.Experimental;
+import redis.clients.jedis.util.SafeEncoder;
+
+/**
+ * The class to manage the client-side caching. User can provide an of implementation of this class
+ * to the client object.
+ */
+@Experimental
+public abstract class AbstractCache implements Cache {
+
+ private Cacheable cacheable;
+ private final Map>> redisKeysToCacheKeys = new ConcurrentHashMap<>();
+ private final int maximumSize;
+ private ReentrantLock lock = new ReentrantLock();
+ private volatile CacheStats stats = new CacheStats();
+
+ protected AbstractCache(int maximumSize) {
+ this(maximumSize, DefaultCacheable.INSTANCE);
+ }
+
+ protected AbstractCache(int maximumSize, Cacheable cacheable) {
+ this.maximumSize = maximumSize;
+ this.cacheable = cacheable;
+ }
+
+ // Cache interface methods
+
+ @Override
+ public int getMaxSize() {
+ return maximumSize;
+ }
+
+ @Override
+ public abstract int getSize();
+
+ @Override
+ public abstract Collection getCacheEntries();
+
+ @Override
+ public CacheEntry get(CacheKey cacheKey) {
+ CacheEntry entry = getFromStore(cacheKey);
+ if (entry != null) {
+ getEvictionPolicy().touch(cacheKey);
+ }
+ return entry;
+ }
+
+ @Override
+ public CacheEntry set(CacheKey cacheKey, CacheEntry entry) {
+ lock.lock();
+ try {
+ entry = putIntoStore(cacheKey, entry);
+ EvictionPolicy policy = getEvictionPolicy();
+ policy.touch(cacheKey);
+ CacheKey evictedKey = policy.evictNext();
+ if (evictedKey != null) {
+ delete(evictedKey);
+ stats.evict();
+ }
+ for (Object redisKey : cacheKey.getRedisKeys()) {
+ ByteBuffer mapKey = makeKeyForRedisKeysToCacheKeys(redisKey);
+ if (redisKeysToCacheKeys.containsKey(mapKey)) {
+ redisKeysToCacheKeys.get(mapKey).add(cacheKey);
+ } else {
+ Set> set = ConcurrentHashMap.newKeySet();
+ set.add(cacheKey);
+ redisKeysToCacheKeys.put(mapKey, set);
+ }
+ }
+ stats.load();
+ return entry;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean delete(CacheKey cacheKey) {
+ lock.lock();
+ try {
+ boolean removed = removeFromStore(cacheKey);
+ getEvictionPolicy().reset(cacheKey);
+
+ // removing it from redisKeysToCacheKeys as well
+ // TODO: considering not doing it, what is the impact of not doing it ??
+ for (Object redisKey : cacheKey.getRedisKeys()) {
+ ByteBuffer mapKey = makeKeyForRedisKeysToCacheKeys(redisKey);
+ Set> cacheKeysRelatedtoRedisKey = redisKeysToCacheKeys.get(mapKey);
+ if (cacheKeysRelatedtoRedisKey != null) {
+ cacheKeysRelatedtoRedisKey.remove(cacheKey);
+ }
+ }
+ return removed;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public List delete(List cacheKeys) {
+ lock.lock();
+ try {
+ return cacheKeys.stream().map(this::delete).collect(Collectors.toList());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public List deleteByRedisKey(Object key) {
+ lock.lock();
+ try {
+ final ByteBuffer mapKey = makeKeyForRedisKeysToCacheKeys(key);
+
+ Set> commands = redisKeysToCacheKeys.get(mapKey);
+ List cacheKeys = new ArrayList<>();
+ if (commands != null) {
+ cacheKeys.addAll(commands.stream().filter(this::removeFromStore).collect(Collectors.toList()));
+ stats.invalidationByServer(cacheKeys.size());
+ redisKeysToCacheKeys.remove(mapKey);
+ }
+ stats.invalidationMessages();
+ return cacheKeys;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public List deleteByRedisKeys(List keys) {
+ if (keys == null) {
+ flush();
+ return null;
+ }
+ lock.lock();
+ try {
+ return ((List