Skip to content

Commit

Permalink
Merge pull request #518 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Jun 21, 2023
2 parents 20cd390 + 0f11afb commit b838ee4
Show file tree
Hide file tree
Showing 99 changed files with 905 additions and 636 deletions.
2 changes: 1 addition & 1 deletion gateleen-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.3.29-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<artifactId>gateleen-cache</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.swisspush.gateleen.cache.storage;

import io.vertx.core.Promise;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.lua.LuaScriptState;
import org.swisspush.gateleen.core.lua.RedisCommand;
import org.swisspush.gateleen.core.redis.RedisProvider;
import org.swisspush.gateleen.core.util.RedisUtils;

import java.util.List;
Expand All @@ -18,23 +18,23 @@ public class CacheRequestRedisCommand implements RedisCommand {
private final List<String> keys;
private final List<String> arguments;
private final Promise<Void> promise;
private final RedisAPI redisAPI;
private final RedisProvider redisProvider;
private final Logger log;

public CacheRequestRedisCommand(LuaScriptState luaScriptState, List<String> keys, List<String> arguments,
RedisAPI redisAPI, Logger log, final Promise<Void> promise) {
RedisProvider redisProvider, Logger log, final Promise<Void> promise) {
this.luaScriptState = luaScriptState;
this.keys = keys;
this.arguments = arguments;
this.redisAPI = redisAPI;
this.redisProvider = redisProvider;
this.log = log;
this.promise = promise;
}

@Override
public void exec(int executionCounter) {
List<String> args= RedisUtils.toPayload(luaScriptState.getSha(), keys.size(), keys, arguments);
redisAPI.evalsha(args, event -> {
List<String> args = RedisUtils.toPayload(luaScriptState.getSha(), keys.size(), keys, arguments);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> {
if (event.succeeded()) {
String resultStr = event.result().toString();
if ("OK".equals(resultStr)) {
Expand All @@ -50,13 +50,13 @@ public void exec(int executionCounter) {
if (executionCounter > 10) {
promise.fail("amount the script got loaded is higher than 10, we abort");
} else {
luaScriptState.loadLuaScript(new CacheRequestRedisCommand(luaScriptState, keys, arguments, redisAPI, log, promise), executionCounter);
luaScriptState.loadLuaScript(new CacheRequestRedisCommand(luaScriptState, keys, arguments,
redisProvider, log, promise), executionCounter);
}
} else {
promise.fail("CacheRequestRedisCommand request failed with message: " + message);
}
}
});

})).onFailure(throwable -> promise.fail("Redis: CacheRequestRedisCommand request failed with message: " + throwable.getMessage()));
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.swisspush.gateleen.cache.storage;

import io.vertx.core.Promise;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.lua.LuaScriptState;
import org.swisspush.gateleen.core.lua.RedisCommand;
import org.swisspush.gateleen.core.redis.RedisProvider;
import org.swisspush.gateleen.core.util.RedisUtils;

import java.util.List;
Expand All @@ -18,41 +18,41 @@ public class ClearCacheRedisCommand implements RedisCommand {
private final List<String> keys;
private final List<String> arguments;
private final Promise<Long> promise;
private final RedisAPI redisAPI;
private final RedisProvider redisProvider;
private final Logger log;

public ClearCacheRedisCommand(LuaScriptState luaScriptState, List<String> keys, List<String> arguments,
RedisAPI redisAPI, Logger log, final Promise<Long> promise) {
RedisProvider redisProvider, Logger log, final Promise<Long> promise) {
this.luaScriptState = luaScriptState;
this.keys = keys;
this.arguments = arguments;
this.redisAPI = redisAPI;
this.redisProvider = redisProvider;
this.log = log;
this.promise = promise;
}

@Override
public void exec(int executionCounter) {
List<String> args= RedisUtils.toPayload(luaScriptState.getSha(), keys.size(), keys, arguments);
redisAPI.evalsha(args, event -> {
if(event.succeeded()){
List<String> args = RedisUtils.toPayload(luaScriptState.getSha(), keys.size(), keys, arguments);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> {
if (event.succeeded()) {
Long clearedItemsCount = event.result().toLong();
promise.complete(clearedItemsCount);
} else {
String message = event.cause().getMessage();
if(message != null && message.startsWith("NOSCRIPT")) {
if (message != null && message.startsWith("NOSCRIPT")) {
log.warn("ClearCacheRedisCommand script couldn't be found, reload it");
log.warn("amount the script got loaded: " + executionCounter);
if(executionCounter > 10) {
if (executionCounter > 10) {
promise.fail("amount the script got loaded is higher than 10, we abort");
} else {
luaScriptState.loadLuaScript(new ClearCacheRedisCommand(luaScriptState, keys, arguments, redisAPI, log, promise), executionCounter);
luaScriptState.loadLuaScript(new ClearCacheRedisCommand(luaScriptState, keys, arguments,
redisProvider, log, promise), executionCounter);
}
} else {
promise.fail("ClearCacheRedisCommand request failed with message: " + message);
}
}
});

})).onFailure(throwable -> promise.fail("Redis: ClearCacheRedisCommand request failed with message: " + throwable.getMessage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.lua.LuaScriptState;
import org.swisspush.gateleen.core.redis.RedisProvider;
import org.swisspush.gateleen.core.util.Address;

import java.time.Duration;
Expand All @@ -29,19 +29,19 @@ public class RedisCacheStorage implements CacheStorage {
private Logger log = LoggerFactory.getLogger(RedisCacheStorage.class);

private final Lock lock;
private final RedisAPI redisAPI;
private final RedisProvider redisProvider;
private LuaScriptState clearCacheLuaScriptState;
private LuaScriptState cacheRequestLuaScriptState;

public static final String CACHED_REQUESTS = "gateleen.cache-cached-requests";
public static final String CACHE_PREFIX = "gateleen.cache:";
public static final String STORAGE_CLEANUP_TASK_LOCK = "cacheStorageCleanupTask";

public RedisCacheStorage(Vertx vertx, Lock lock, RedisAPI redisAPI, long storageCleanupIntervalMs) {
public RedisCacheStorage(Vertx vertx, Lock lock, RedisProvider redisProvider, long storageCleanupIntervalMs) {
this.lock = lock;
this.redisAPI = redisAPI;
clearCacheLuaScriptState = new LuaScriptState(CacheLuaScripts.CLEAR_CACHE, redisAPI, false);
cacheRequestLuaScriptState = new LuaScriptState(CacheLuaScripts.CACHE_REQUEST, redisAPI, false);
this.redisProvider = redisProvider;
clearCacheLuaScriptState = new LuaScriptState(CacheLuaScripts.CLEAR_CACHE, redisProvider, false);
cacheRequestLuaScriptState = new LuaScriptState(CacheLuaScripts.CACHE_REQUEST, redisProvider, false);

vertx.setPeriodic(storageCleanupIntervalMs, event -> {
String token = token(STORAGE_CLEANUP_TASK_LOCK);
Expand Down Expand Up @@ -80,15 +80,15 @@ public Future<Void> cacheRequest(String cacheIdentifier, Buffer cachedObject, Du
Promise<Void> promise = Promise.promise();
List<String> keys = Collections.singletonList(CACHED_REQUESTS);
List<String> arguments = List.of(CACHE_PREFIX, cacheIdentifier, cachedObject.toString(), String.valueOf(cacheExpiry.toMillis()));
CacheRequestRedisCommand cmd = new CacheRequestRedisCommand(cacheRequestLuaScriptState, keys, arguments, redisAPI, log, promise);
CacheRequestRedisCommand cmd = new CacheRequestRedisCommand(cacheRequestLuaScriptState, keys, arguments, redisProvider, log, promise);
cmd.exec(0);
return promise.future();
}

@Override
public Future<Optional<Buffer>> cachedRequest(String cacheIdentifier) {
Promise<Optional<Buffer>> promise = Promise.promise();
redisAPI.get(CACHE_PREFIX + cacheIdentifier, event -> {
redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(CACHE_PREFIX + cacheIdentifier, event -> {
if (event.failed()) {
String message = "Failed to get cached request '" + cacheIdentifier + "'. Cause: " + logCause(event);
log.error(message);
Expand All @@ -100,6 +100,10 @@ public Future<Optional<Buffer>> cachedRequest(String cacheIdentifier) {
promise.complete(Optional.empty());
}
}
})).onFailure(throwable -> {
String message = "Redis: Failed to get cached request '" + cacheIdentifier + "'. Cause: " + throwable.getMessage();
log.error(message);
promise.fail(message);
});
return promise.future();
}
Expand All @@ -109,22 +113,26 @@ public Future<Long> clearCache() {
Promise<Long> promise = Promise.promise();
List<String> keys = Collections.singletonList(CACHED_REQUESTS);
List<String> arguments = List.of(CACHE_PREFIX, "true");
ClearCacheRedisCommand cmd = new ClearCacheRedisCommand(clearCacheLuaScriptState, keys, arguments, redisAPI, log, promise);
ClearCacheRedisCommand cmd = new ClearCacheRedisCommand(clearCacheLuaScriptState, keys, arguments, redisProvider, log, promise);
cmd.exec(0);
return promise.future();
}

@Override
public Future<Long> cacheEntriesCount() {
Promise<Long> promise = Promise.promise();
redisAPI.scard(CACHED_REQUESTS, reply -> {
redisProvider.redis().onSuccess(redisAPI -> redisAPI.scard(CACHED_REQUESTS, reply -> {
if (reply.failed()) {
String message = "Failed to get count of cached requests. Cause: " + logCause(reply);
log.error(message);
promise.fail(message);
} else {
promise.complete(reply.result().toLong());
}
})).onFailure(throwable -> {
String message = "Redis: Failed to get count of cached requests. Cause: " + throwable.getMessage();
log.error(message);
promise.fail(message);
});

return promise.future();
Expand All @@ -133,7 +141,7 @@ public Future<Long> cacheEntriesCount() {
@Override
public Future<Set<String>> cacheEntries() {
Promise<Set<String>> promise = Promise.promise();
redisAPI.smembers(CACHED_REQUESTS, reply -> {
redisProvider.redis().onSuccess(redisAPI -> redisAPI.smembers(CACHED_REQUESTS, reply -> {
if (reply.failed()) {
String message = "Failed to get cached requests. Cause: " + logCause(reply);
log.error(message);
Expand All @@ -147,6 +155,10 @@ public Future<Set<String>> cacheEntries() {
.collect(Collectors.toSet());
promise.complete(result);
}
})).onFailure(throwable -> {
String message = "Redis: Failed to get cached requests. Cause: " + throwable.getMessage();
log.error(message);
promise.fail(message);
});

return promise.future();
Expand All @@ -156,7 +168,7 @@ private Future<Long> cleanup() {
Promise<Long> promise = Promise.promise();
List<String> keys = Collections.singletonList(CACHED_REQUESTS);
List<String> arguments = List.of(CACHE_PREFIX, "false");
ClearCacheRedisCommand cmd = new ClearCacheRedisCommand(clearCacheLuaScriptState, keys, arguments, redisAPI, log, promise);
ClearCacheRedisCommand cmd = new ClearCacheRedisCommand(clearCacheLuaScriptState, keys, arguments, redisProvider, log, promise);
cmd.exec(0);
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void testCacheAdminFunctionEntriesEmpty(TestContext context) {
context.assertEquals(CONTENT_TYPE_JSON, response.headers().get(CONTENT_TYPE_HEADER));
}

private class Request extends DummyHttpServerRequest {
private static class Request extends DummyHttpServerRequest {
private MultiMap headers;
private HttpMethod httpMethod;
private String uri;
Expand Down Expand Up @@ -332,7 +332,7 @@ public Request(HttpMethod httpMethod, String uri, MultiMap headers, HttpServerRe
public HttpServerRequest resume() { return this; }
}

private class Response extends DummyHttpServerResponse {
private static class Response extends DummyHttpServerResponse {
private MultiMap headers;

public Response() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public void setUp() {
Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.succeededFuture(Boolean.TRUE));
Mockito.when(lock.releaseLock(anyString(), anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));

redisCacheStorage = new RedisCacheStorage(vertx, lock, RedisAPI.api(new RedisClient(vertx, new RedisOptions())), 2000);
RedisAPI redisAPI = RedisAPI.api(new RedisClient(vertx, new RedisOptions()));

redisCacheStorage = new RedisCacheStorage(vertx, lock, () -> Future.succeededFuture(redisAPI), 2000);
jedis = new Jedis(new HostAndPort("localhost", 6379));
try {
jedis.flushAll();
Expand Down
2 changes: 1 addition & 1 deletion gateleen-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.3.29-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<artifactId>gateleen-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,6 @@ public Future<Void> end() {
return doEnd();
}

@Override
public Future<Void> sendFile(String filename, long offset, long length) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
// makes not really sense as we have no underlying TCP connection which can be closed
Expand All @@ -324,31 +319,6 @@ public boolean headWritten() {
return written;
}

@Override
public Future<HttpServerResponse> push(HttpMethod method, String host, String path, MultiMap headers) {
throw new UnsupportedOperationException();
}

@Override
public HttpServerResponse addCookie(Cookie cookie) {
throw new UnsupportedOperationException();
}

@Override
public @Nullable Cookie removeCookie(String name, boolean invalidate) {
throw new UnsupportedOperationException();
}

@Override
public Set<Cookie> removeCookies(String name, boolean invalidate) {
throw new UnsupportedOperationException();
}

@Override
public @Nullable Cookie removeCookie(String name, String domain, String path, boolean invalidate) {
throw new UnsupportedOperationException();
}

@Override
public HttpServerResponse setWriteQueueMaxSize(int maxSize) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.lock.lua.LockLuaScripts;
import org.swisspush.gateleen.core.lock.lua.ReleaseLockRedisCommand;
import org.swisspush.gateleen.core.lua.LuaScriptState;
import org.swisspush.gateleen.core.redis.RedisProvider;
import org.swisspush.gateleen.core.util.FailedAsyncResult;
import org.swisspush.gateleen.core.util.RedisUtils;

import java.util.Collections;
Expand All @@ -31,11 +32,11 @@ public class RedisBasedLock implements Lock {
public static final String STORAGE_PREFIX = "gateleen.core-lock:";

private LuaScriptState releaseLockLuaScriptState;
private RedisAPI redisAPI;
private RedisProvider redisProvider;

public RedisBasedLock(RedisAPI redisAPI) {
this.redisAPI = redisAPI;
this.releaseLockLuaScriptState = new LuaScriptState(LockLuaScripts.LOCK_RELEASE, redisAPI, false);
public RedisBasedLock(RedisProvider redisProvider) {
this.redisProvider = redisProvider;
this.releaseLockLuaScriptState = new LuaScriptState(LockLuaScripts.LOCK_RELEASE, redisProvider, false);
}

private void redisSetWithOptions(String key, String value, boolean nx, long px, Handler<AsyncResult<Response>> handler) {
Expand All @@ -44,7 +45,8 @@ private void redisSetWithOptions(String key, String value, boolean nx, long px,
if (nx) {
options.add("NX");
}
redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0])).onComplete(handler);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0]))
.onComplete(handler)).onFailure(throwable -> handler.handle(new FailedAsyncResult<>(throwable)));
}

@Override
Expand All @@ -70,7 +72,7 @@ public Future<Boolean> releaseLock(String lock, String token) {
List<String> keys = Collections.singletonList(buildLockKey(lock));
List<String> arguments = Collections.singletonList(token);
ReleaseLockRedisCommand cmd = new ReleaseLockRedisCommand(releaseLockLuaScriptState,
keys, arguments, redisAPI, log, promise);
keys, arguments, redisProvider, log, promise);
cmd.exec(0);
return promise.future();
}
Expand Down
Loading

0 comments on commit b838ee4

Please sign in to comment.