Skip to content

Commit

Permalink
Merge branch 'master' into pipe-pipe-and-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Nov 2, 2023
2 parents 6ecad52 + 5a70626 commit aa43365
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 21 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7379
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node1.pid
logfile /tmp/redis_cluster_node1.log
save ""
Expand All @@ -223,7 +223,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7380
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node2.pid
logfile /tmp/redis_cluster_node2.log
save ""
Expand All @@ -237,7 +237,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7381
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node3.pid
logfile /tmp/redis_cluster_node3.log
save ""
Expand All @@ -251,7 +251,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7382
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node4.pid
logfile /tmp/redis_cluster_node4.log
save ""
Expand All @@ -265,7 +265,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7383
cluster-node-timeout 5000
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node5.pid
logfile /tmp/redis_cluster_node5.log
save ""
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version>
<version>3.2.1</version>
<configuration>
<systemPropertyVariables>
<redis-hosts>${redis-hosts}</redis-hosts>
Expand Down Expand Up @@ -321,7 +321,7 @@
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version>
<version>3.2.1</version>
<configuration>
<test>**/examples/*Example.java</test>
</configuration>
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
Expand All @@ -23,6 +24,13 @@ public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientCo
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
this(provider, new ClusterCommandObjects());
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2846,7 +2846,8 @@ public final CommandObject<String> scriptKill(byte[] sampleKey) {
return new CommandObject<>(commandArguments(SCRIPT).add(KILL).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SLOWLOG_RESET_COMMAND_OBJECT = new CommandObject<>(commandArguments(SLOWLOG).add(RESET), BuilderFactory.STRING);
private final CommandObject<String> SLOWLOG_RESET_COMMAND_OBJECT
= new CommandObject<>(commandArguments(SLOWLOG).add(Keyword.RESET), BuilderFactory.STRING);

public final CommandObject<String> slowlogReset() {
return SLOWLOG_RESET_COMMAND_OBJECT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately

socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout);
// Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor.
// For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail.
socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout);
return socket;
} catch (Exception e) {
jce.addSuppressed(e);
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -9259,6 +9259,12 @@ public String lolwut(LolwutParams lolwutParams) {
return connection.getBulkReply();
}

@Override
public String reset() {
connection.sendCommand(Command.RESET);
return connection.getStatusCodeReply();
}

@Override
public String latencyDoctor() {
checkIsInMultiOrPipeline();
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

public class JedisCluster extends UnifiedJedis {

public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";

/**
* Default timeout in milliseconds.
*/
Expand Down Expand Up @@ -195,6 +197,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
85 changes: 81 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,17 +11,26 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

public class JedisClusterInfoCache {

private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);

private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
Expand All @@ -36,21 +46,75 @@ public class JedisClusterInfoCache {

private static final int MASTER_NODE_INDEX = 2;

/**
* The single thread executor for the topology refresh task.
*/
private ScheduledExecutorService topologyRefreshExecutor = null;

class TopologyRefreshTask implements Runnable {
@Override
public void run() {
logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet());
renewClusterSlots(null);
logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet());
}
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor();
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

/**
* Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS
* @param slotsInfo the cluster topology
* @return if slots is ok, return true, elese return false.
*/
private boolean checkClusterSlotSequence(List<Object> slotsInfo) {
List<Integer> slots = new ArrayList<>();
for (Object slotInfoObj : slotsInfo) {
List<Object> slotInfo = (List<Object>)slotInfoObj;
slots.addAll(getAssignedSlotArray(slotInfo));
}
Collections.sort(slots);
if (slots.size() != Protocol.CLUSTER_HASHSLOTS) {
return false;
}
for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) {
if (i != slots.get(i)) {
return false;
}
}
return true;
}

public void discoverClusterNodesAndSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -133,8 +197,13 @@ public void renewClusterSlots(Connection jedis) {

private void discoverClusterSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -308,6 +377,14 @@ public void reset() {
}
}

public void close() {
reset();
if (topologyRefreshExecutor != null) {
logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes);
topologyRefreshExecutor.shutdownNow();
}
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public static enum Command implements ProtocolCommand {
SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub
SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, PERSIST, ROLE, FAILOVER, SLOWLOG, OBJECT, CLIENT, TIME,
SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, READONLY, READWRITE, SLAVEOF, REPLICAOF, COPY,
SENTINEL, MODULE, ACL, TOUCH, MEMORY, LOLWUT, COMMAND, LATENCY, WAITAOF;
SENTINEL, MODULE, ACL, TOUCH, MEMORY, LOLWUT, COMMAND, RESET, LATENCY, WAITAOF;

private final byte[] raw;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ default void shutdown(SaveMode saveMode) throws JedisException {

String lolwut(LolwutParams lolwutParams);

String reset();

/**
* The LATENCY DOCTOR command reports about different latency-related issues and advises about
* possible remedies.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -17,9 +18,9 @@
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;

public class ClusterConnectionProvider implements ConnectionProvider {
import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";
public class ClusterConnectionProvider implements ConnectionProvider {

protected final JedisClusterInfoCache cache;

Expand All @@ -34,6 +35,12 @@ public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfi
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
if (startNodes.isEmpty()) {
throw new JedisClusterOperationException("No nodes to initialize cluster slots cache.");
Expand Down Expand Up @@ -66,7 +73,7 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig

@Override
public void close() {
cache.reset();
cache.close();
}

public void renewSlotCache() {
Expand Down
Loading

0 comments on commit aa43365

Please sign in to comment.