Skip to content

Commit

Permalink
Fix probable missing (RESP3) protocol processing (#3692)
Browse files Browse the repository at this point in the history
* Fix probable missing (RESP3) protocol processing

* undo commented lines
  • Loading branch information
sazzad16 authored Jan 17, 2024
1 parent 8cad5ac commit cf60d25
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 81 deletions.
43 changes: 28 additions & 15 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,43 +172,56 @@ public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int so
maxAttempts, poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, clientConfig, maxAttempts,
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, Duration maxTotalRetriesDuration,
GenericObjectPoolConfig<Connection> poolConfig) {
super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this(clusterNodes, clientConfig, DEFAULT_MAX_ATTEMPTS);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
super(clusterNodes, clientConfig, maxAttempts);
this(clusterNodes, clientConfig, maxAttempts,
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts));
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, clientConfig, DEFAULT_MAX_ATTEMPTS, poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, clientConfig, maxAttempts,
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration);
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
}

private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public JedisPooled(final String host, final int port) {
}

public JedisPooled(final HostAndPort hostAndPort) {
this(new PooledConnectionProvider(hostAndPort));
super(hostAndPort);
}

public JedisPooled(final String host, final int port, final boolean ssl) {
Expand All @@ -73,7 +73,7 @@ public JedisPooled(final String host, final int port, final String user, final S
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new PooledConnectionProvider(hostAndPort, clientConfig));
super(hostAndPort, clientConfig);
}

public JedisPooled(PooledObjectFactory<Connection> factory) {
Expand Down Expand Up @@ -373,12 +373,13 @@ public JedisPooled(final GenericObjectPoolConfig<Connection> poolConfig, final H

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(new PooledConnectionProvider(hostAndPort, clientConfig, poolConfig));
super(new PooledConnectionProvider(hostAndPort, clientConfig, poolConfig), clientConfig.getRedisProtocol());
}

public JedisPooled(final GenericObjectPoolConfig<Connection> poolConfig,
final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this(new ConnectionFactory(jedisSocketFactory, clientConfig), poolConfig);
super(new PooledConnectionProvider(new ConnectionFactory(jedisSocketFactory, clientConfig), poolConfig),
clientConfig.getRedisProtocol());
}

public JedisPooled(GenericObjectPoolConfig<Connection> poolConfig, PooledObjectFactory<Connection> factory) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ public class JedisSentineled extends UnifiedJedis {

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(new SentineledConnectionProvider(masterName, masterClientConfig, sentinels, sentinelClientConfig));
super(new SentineledConnectionProvider(masterName, masterClientConfig, sentinels, sentinelClientConfig),
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig));
super(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig),
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider) {
Expand Down
122 changes: 62 additions & 60 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public UnifiedJedis() {
}

public UnifiedJedis(HostAndPort hostAndPort) {
// this(new Connection(hostAndPort));
this(new PooledConnectionProvider(hostAndPort));
this(new PooledConnectionProvider(hostAndPort), (RedisProtocol) null);
}

public UnifiedJedis(final String url) {
Expand Down Expand Up @@ -89,27 +88,15 @@ public UnifiedJedis(final URI uri, JedisClientConfig config) {
}

public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
// this(new Connection(hostAndPort, clientConfig));
this(new PooledConnectionProvider(hostAndPort, clientConfig));
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
this(new PooledConnectionProvider(hostAndPort, clientConfig), clientConfig.getRedisProtocol());
}

public UnifiedJedis(ConnectionProvider provider) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
try (Connection conn = this.provider.getConnection()) {
if (conn != null) {
RedisProtocol proto = conn.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
}
//} catch (JedisAccessControlException ace) {
} catch (JedisException je) { // TODO: use specific exception(s)
// use default protocol
}
this(new DefaultCommandExecutor(provider), provider);
}

protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol);
}

/**
Expand Down Expand Up @@ -142,69 +129,60 @@ public UnifiedJedis(Connection connection) {
this.provider = null;
this.executor = new SimpleCommandExecutor(connection);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
RedisProtocol proto = connection.getRedisProtocol();
if (proto == RedisProtocol.RESP3) this.commandObjects.setProtocol(proto);
if (proto != null) this.commandObjects.setProtocol(proto);
this.graphCommandObjects = new GraphCommandObjects(this);
}

@Deprecated
public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig), maxAttempts,
Duration.ofMillis(maxAttempts * clientConfig.getSocketTimeoutMillis()));
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
this(jedisClusterNodes, clientConfig, maxAttempts, Duration.ofMillis(maxAttempts * clientConfig.getSocketTimeoutMillis()));
}

public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration);
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
@Deprecated
public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

@Deprecated
public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration);
RedisProtocol proto = clientConfig.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig), maxAttempts,
maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this.provider = provider;
this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration);
this.commandObjects = new ClusterCommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
new ClusterCommandObjects());
}

protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
new ClusterCommandObjects(), protocol);
}

/**
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
@Deprecated
public UnifiedJedis(ShardedConnectionProvider provider) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new ShardedCommandObjects(provider.getHashingAlgo());
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
this(new DefaultCommandExecutor(provider), provider, new ShardedCommandObjects(provider.getHashingAlgo()));
}

/**
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
@Deprecated
public UnifiedJedis(ShardedConnectionProvider provider, Pattern tagPattern) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern);
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
this(new DefaultCommandExecutor(provider), provider, new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern));
}

public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this.provider = provider;
this.executor = new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
}

/**
Expand All @@ -215,11 +193,7 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
* <p>
*/
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
this.executor = new CircuitBreakerCommandExecutor(provider);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
this(new CircuitBreakerCommandExecutor(provider), provider);
}

/**
Expand All @@ -229,9 +203,36 @@ public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
* {@link UnifiedJedis#provider} is accessed.
*/
public UnifiedJedis(CommandExecutor executor) {
this.provider = null;
this(executor, (ConnectionProvider) null);
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
this(executor, provider, new CommandObjects());
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
this(executor, provider, commandObjects, null);
if (this.provider != null) {
try (Connection conn = this.provider.getConnection()) {
if (conn != null) {
RedisProtocol proto = conn.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
}
} catch (JedisException je) { }
}
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol) {
this.provider = provider;
this.executor = executor;
this.commandObjects = new CommandObjects();

this.commandObjects = commandObjects;
if (protocol != null) {
this.commandObjects.setProtocol(protocol);
}

this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}
Expand All @@ -241,6 +242,7 @@ public void close() {
IOUtils.closeQuietly(this.executor);
}

@Deprecated
protected final void setProtocol(RedisProtocol protocol) {
this.protocol = protocol;
this.commandObjects.setProtocol(this.protocol);
Expand Down

0 comments on commit cf60d25

Please sign in to comment.