Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cover Redis commands for client side caching #3702

Merged
merged 12 commits into from
Feb 15, 2024
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
110 changes: 70 additions & 40 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,101 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
public abstract class ClientSideCache {

private final Map<ByteBuffer, Object> cache;
private final Map<ByteBuffer, Set<Long>> keyHashes;
private final ReentrantLock writeLock = new ReentrantLock();

public ClientSideCache() {
this.cache = new HashMap<>();
protected ClientSideCache() {
this.keyHashes = new ConcurrentHashMap<>();
}

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
}
public abstract void invalidateAll();

public final void clear() {
cache.clear();
}
protected abstract void invalidateAll(Iterable<Long> hashes);

public final void invalidateKeys(List list) {
final void invalidate(List list) {
if (list == null) {
clear();
invalidateAll();
return;
}

list.forEach(this::invalidateKey);
list.forEach(this::invalidate0);
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
private void invalidate0(Object key) {
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
}
final ByteBuffer mapKey = makeKey0((byte[]) key);

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
Set<Long> hashes = keyHashes.get(mapKey);
if (hashes != null) {
writeLock.lock();
try {
invalidateAll(hashes);
keyHashes.remove(mapKey);
} finally {
writeLock.unlock();
}
}
}

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
}
protected abstract void put(long hash, Object value);

protected abstract Object get(long hash);

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, Object... keys) {

final long hash = getHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
writeLock.lock();
try {
put(hash, value);
for (Object key : keys) {
ByteBuffer mapKey = makeKey(key);
if (keyHashes.containsKey(mapKey)) {
keyHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyHashes.put(mapKey, set);
}
}
} finally {
writeLock.unlock();
}
}

return value;
}

protected abstract long getHash(CommandObject command);

private ByteBuffer makeKey(Object key) {
if (key instanceof byte[]) return makeKey0((byte[]) key);
else if (key instanceof String) return makeKey0(SafeEncoder.encode((String) key));
else throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKey0(byte[] b) {
return ByteBuffer.wrap(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
Loading
Loading