Skip to content

Commit

Permalink
WIP: experimenting with using the vertx-redis client library.
Browse files Browse the repository at this point in the history
  • Loading branch information
StFS committed Aug 29, 2023
1 parent a202675 commit f68e63d
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 71 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000}
<artifactId>vertx-mongo-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,14 @@ public Future<V> get(final K key) {
public Future<Map<K, V>> getAll(final Set<? extends K> keys) {
Objects.requireNonNull(keys);

return withCache(aCache -> aCache.getAllAsync(keys));
final var result = withCache(aCache -> aCache.getAllAsync(keys));
result.onSuccess(r -> {
LOG.info("BasicCache: getAll() ({})", keys.size());
final var resultKeys = r.keySet();
r.forEach((k, v) -> LOG.info("BasicCache#getAll() result: {}:{}", k, v));
keys.forEach((k) -> LOG.info("BasicCache#getAll() keys exist in result: {} / {}", k, resultKeys.contains(k)));
});
return result;
}

/**
Expand Down
41 changes: 38 additions & 3 deletions client-device-connection-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,44 @@
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager-embedded</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-development-mode-spi</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bootstrap-runner</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-fs-util</artifactId>
</exclusion>
</exclusions>
</dependency>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,27 @@

package org.eclipse.hono.deviceconnection.redis.client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.hono.deviceconnection.common.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.RedisConnection;

/**
* TODO.
Expand All @@ -35,110 +44,232 @@ public class RedisCache<K, V> implements Cache<K, V> {

private static final Logger LOG = LoggerFactory.getLogger(RedisCache.class);

private JedisPool pool;
private static final int MAX_RECONNECT_RETRIES = 16;

private final Vertx vertx;
private final RedisRemoteConfigurationProperties properties;

private Redis redis;
private RedisConnection client;
private final AtomicBoolean CONNECTING = new AtomicBoolean();

/**
* TODO.
*
* @param vertx TODO.
* @param properties TODO.
*/
public RedisCache() {
public RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties properties) {

Objects.requireNonNull(vertx);
Objects.requireNonNull(properties);

this.vertx = vertx;
this.properties = properties;

LOG.info("Initializing REDIS cache!");
try {
pool = new JedisPool("redis", 6379);
try (Jedis jedis = pool.getResource()) {
final var response = jedis.ping();
LOG.info("Got {} from redis server", response);
}
} catch (Exception e) {
LOG.error("something went wrong", e);
createRedisClient()
.onSuccess( c -> LOG.info("Connected to Redis"))
.onFailure( t -> LOG.error("Could not connect to Redis", t));
}

/**
* TODO.
*
* @param vertx TODO.
* @param properties TODO.
*
* @return TODO.
*/
public static RedisCache<String, String> from(
final Vertx vertx, final RedisRemoteConfigurationProperties properties) {

Objects.requireNonNull(vertx);
Objects.requireNonNull(properties);

return new RedisCache<>(vertx, properties);
}

/**
* Will create a redis client and set up a reconnect handler when there is an exception in the connection.
*/
private Future<RedisConnection> createRedisClient() {
final Promise<RedisConnection> promise = Promise.promise();

// make sure to invalidate old connection if present
if (redis != null) {
redis.close();;
}

if (CONNECTING.compareAndSet(false, true)) {
redis = Redis.createClient(vertx, properties);
redis
.connect()
.onSuccess(conn -> {
client = conn;

// make sure the client is reconnected on error
// eg, the underlying TCP connection is closed but the client side doesn't know it yet
// the client tries to use the staled connection to talk to server. An exceptions will be raised
conn.exceptionHandler(e -> attemptReconnect(0));

// make sure the client is reconnected on connection close
// eg, the underlying TCP connection is closed with normal 4-Way-Handshake
// this handler will be notified instantly
conn.endHandler(placeHolder -> attemptReconnect(0));

// allow further processing
promise.complete(conn);
CONNECTING.set(false);
}).onFailure(t -> {
promise.fail(t);
CONNECTING.set(false);
});
} else {
promise.complete();
}

return promise.future();
}

private void attemptReconnect(final int retry) {
if (retry > MAX_RECONNECT_RETRIES) {
// we should stop now, as there's nothing we can do.
CONNECTING.set(false);
} else {
// retry with backoff up to 10240 ms
final long backoff = (long) (Math.pow(2, Math.min(retry, 10)) * 10);

vertx.setTimer(backoff, timer -> createRedisClient().onFailure(t -> attemptReconnect(retry + 1)));
}
}

@Override
public Future<JsonObject> checkForCacheAvailability() {
LOG.info("REDIS: checking for cache availability");
try (Jedis jedis = pool.getResource()) {
final var response = jedis.ping();
LOG.info("Got {} from redis server", response);
return Future.succeededFuture(new JsonObject());
} catch (Exception e) {
return Future.failedFuture(e);
}

Objects.requireNonNull(client);

final Promise<JsonObject> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.ping(List.of())
.onSuccess(v -> promise.complete(new JsonObject()))
.onFailure(promise::fail);

return promise.future();
}

@Override
public Future<Void> put(final K key, final V value) {
LOG.info("REDIS: put {}={}", key, value);
try (Jedis jedis = pool.getResource()) {
jedis.set(key.toString(), value.toString());
}
return Future.succeededFuture();
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.set(List.of(key.toString(), value.toString()))
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Void> put(final K key, final V value, final long lifespan, final TimeUnit lifespanUnit) {
LOG.info("REDIS: put {}={} ({} {})", key, value, lifespan, lifespanUnit);
try (Jedis jedis = pool.getResource()) {
jedis.psetex(key.toString(), lifespanUnit.toMillis(lifespan), value.toString());
return Future.succeededFuture();
} catch (Exception e) {
return Future.failedFuture(e);
final long millis = lifespanUnit.toMillis(lifespan);
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
final List<String> params = new ArrayList<>(List.of(key.toString(), value.toString()));
if (millis > 0) {
params.addAll(List.of("PX", String.valueOf(millis)));
}
redis.set(params)
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Void> putAll(final Map<? extends K, ? extends V> data) {
LOG.info("REDIS: putAll ({})", data.size());
try (Jedis jedis = pool.getResource()) {
for (K k : data.keySet()) {
jedis.set(k.toString(), data.get(k).toString());
}
return Future.succeededFuture();
} catch (Exception e) {
return Future.failedFuture(e);
}
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
final List<String> keyValues = new ArrayList<>(data.size() * 2);
data.forEach((k, v) -> {
keyValues.add(k.toString());
keyValues.add(v.toString()); });
redis.mset(keyValues)
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Void> putAll(final Map<? extends K, ? extends V> data, final long lifespan, final TimeUnit lifespanUnit) {
LOG.info("REDIS: putAll ({}) ({} {})", data.size(), lifespan, lifespanUnit);
try (Jedis jedis = pool.getResource()) {
for (K k : data.keySet()) {
jedis.psetex(k.toString(), lifespanUnit.toMillis(lifespan), data.get(k).toString());
final Promise<Void> promise = Promise.promise();
final long millis = lifespanUnit.toMillis(lifespan);
final RedisAPI redis = RedisAPI.api(client);
redis.multi();
data.forEach((k, v) -> {
final List<String> params = new ArrayList<>(List.of(k.toString(), v.toString()));
if (millis > 0) {
params.addAll(List.of("PX", String.valueOf(millis)));
}
return Future.succeededFuture();
} catch (Exception e) {
return Future.failedFuture(e);
}
redis.set(params);
});
redis.exec()
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<V> get(final K key) {
LOG.info("REDIS: get {}", key);
try (Jedis jedis = pool.getResource()) {
return Future.succeededFuture((V) jedis.get(key.toString()));
} catch (Exception e) {
return Future.failedFuture(e);
}
final Promise<V> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.get(key.toString())
.onSuccess(v -> promise.complete((V) v) )
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Boolean> remove(final K key, final V value) {
LOG.info("REDIS: remove {}={}", key, value);
try (Jedis jedis = pool.getResource()) {
jedis.del(key.toString());
return Future.succeededFuture(true);
} catch (Exception e) {
return Future.failedFuture(e);
}
//TODO: why is the value being passed here? Do we need to use that?
final Promise<Boolean> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.del(List.of(key.toString()))
.onSuccess(v -> promise.complete(true))
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Map<K, V>> getAll(final Set<? extends K> keys) {
LOG.warn("getAll() ({}) called but that has not been implemented!!!", keys.size());
try (Jedis jedis = pool.getResource()) {
return Future.succeededFuture(null);
} catch (Exception e) {
return Future.failedFuture(e);
}
LOG.info("REDIS: getAll {}", keys.size());
final Promise<Map<K, V>> promise = Promise.promise();

final RedisAPI redis = RedisAPI.api(client);
// Make sure the keys are in order and we can pop off the front
final LinkedList<String> keyList = new LinkedList<>(keys.stream().map(String::valueOf).toList());
keyList.forEach(i -> LOG.info("REDIS: Item: {}", i));
final Map<K, V> result = new HashMap<>(keyList.size());
redis.mget(keyList)
.onComplete(v -> {
LOG.info("REDIS: Got {} items back...", v.result().stream().toList().size());
v.result().forEach(i -> {
LOG.info("Iterating through result list: {}", i);
try {
if (i != null) { // TODO: this is kinda strange but some results are null and the BasicCache does not include those in the returned result. Ask about/investigate.
result.put((K) keyList.removeFirst(), i == null ? null : (V) i.toString());
}
} catch (Exception e) {
LOG.info(" - got exception {}", e.getMessage());
}
});
promise.complete(result);
})
.onFailure(promise::fail);
return promise.future();
}
}
Loading

0 comments on commit f68e63d

Please sign in to comment.