Skip to content

Commit

Permalink
Introducing periodic topology mechanism for JedisCluster
Browse files Browse the repository at this point in the history
solves #3595
  • Loading branch information
yangbodong22011 committed Oct 26, 2023
1 parent d2f6712 commit d807a4c
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 2 deletions.
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
Expand All @@ -23,6 +24,14 @@ public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientCo
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig,
topologyRefreshEnabled, topologyRefreshPeriod),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
this(provider, new ClusterCommandObjects());
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, int maxAttempts, Duration maxTotalRetriesDuration,
boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) {
super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration, topologyRefreshEnabled,
topologyRefreshPeriod);
}

public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
58 changes: 57 additions & 1 deletion src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,16 +11,24 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

public class JedisClusterInfoCache {
private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);
private static final boolean DEFAULT_TOPOLOGY_REFRESH_ENABLED = true;
private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = Duration.ofSeconds(60);

private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
Expand All @@ -36,15 +45,46 @@ public class JedisClusterInfoCache {

private static final int MASTER_NODE_INDEX = 2;

private final boolean topologyRefreshEnabled;
private final Duration topologyRefreshPeriod;

/**
* The single thread executor for the topology refresh task.
*/
private static final ScheduledExecutorService topologyRefreshExecutor =
Executors.newSingleThreadScheduledExecutor();

class TopologyRefreshTask implements Runnable {
@Override
public void run() {
logger.debug("Cluster topology refresh run, old nodes: {}", nodes);
renewClusterSlots(null);
logger.debug("Cluster topology refresh run, new nodes: {}", nodes);
}
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, null, startNodes);
this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final boolean topologyRefreshEnabled, final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
this.topologyRefreshEnabled = topologyRefreshEnabled;
this.topologyRefreshPeriod = topologyRefreshPeriod;
if (topologyRefreshEnabled) {
logger.info("Cluster topology refresh start, period: {}", topologyRefreshPeriod.toString());
topologyRefreshExecutor.scheduleAtFixedRate(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

public void discoverClusterNodesAndSlots(Connection jedis) {
Expand Down Expand Up @@ -308,6 +348,22 @@ public void reset() {
}
}

public void close() {
reset();
if (topologyRefreshEnabled) {
logger.info("Cluster topology refresh shutdown");
try {
topologyRefreshExecutor.shutdown();
// wait topologyRefreshPeriod for already run task
if (!topologyRefreshExecutor.awaitTermination(topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS)) {
topologyRefreshExecutor.shutdownNow();
}
} catch (Exception e) {
// ignore exception
}
}
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig client
if (proto != null) commandObjects.setProtocol(proto);
}

public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, int maxAttempts, Duration maxTotalRetriesDuration,
boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig, topologyRefreshEnabled,
topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration);
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
}

public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this.provider = provider;
this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -34,6 +35,13 @@ public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfi
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshEnabled,
topologyRefreshPeriod);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
if (startNodes.isEmpty()) {
throw new JedisClusterOperationException("No nodes to initialize cluster slots cache.");
Expand Down Expand Up @@ -66,7 +74,7 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig

@Override
public void close() {
cache.reset();
cache.close();
}

public void renewSlotCache() {
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,66 @@ public void clusterRefreshNodes() throws Exception {
}
}

@Test
public void clusterPeriodTopologyRefreshTest() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);

// we set topologyRefreshPeriod is 5s
boolean topologyRefreshEnabled = true;
Duration topologyRefreshPeriod = Duration.ofSeconds(3);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG,
DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000), topologyRefreshEnabled, topologyRefreshPeriod)) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4

// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}

node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);

assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));

// make 4 nodes to 3 nodes
cleanUp();
setUp();

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(3, cluster.getClusterNodes().size());
}
}

private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0
// 1394372400827 0 connected 5461-10922
Expand Down

0 comments on commit d807a4c

Please sign in to comment.