Skip to content

Commit

Permalink
put data into QueueStatistic, also read from it
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Dec 14, 2023
1 parent d3ce672 commit d328600
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 77 deletions.
20 changes: 9 additions & 11 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ private enum QueueState {

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();

public class DequeueStatistic{
public Long lastDequeueAttemptTimestamp;
public Long lastDequeueSuccessTimestamp;
public Long nextDequeueDueTimestamp;
}

public RedisQues() {
}

Expand All @@ -112,10 +106,6 @@ public RedisQues(MemoryUsageProvider memoryUsageProvider, RedisquesConfiguration
this.redisProvider = redisProvider;
}

public Map<String, DequeueStatistic> getDequeueStatistic() {
return dequeueStatistic;
}

public static RedisQuesBuilder builder() {
return new RedisQuesBuilder();
}
Expand Down Expand Up @@ -181,6 +171,14 @@ public void start(Promise<Void> promise) {
RedisquesConfiguration modConfig = configurationProvider.configuration();
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

int dequeueStatisticReportInterval = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportInterval > 0) {
vertx.setPeriodic(1000L * dequeueStatisticReportInterval, handler -> {
dequeueStatistic.forEach((queueName, dequeueStatistic) ->
queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic));
});
}

queuesKey = modConfig.getRedisPrefix() + "queues";
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
Expand Down Expand Up @@ -208,7 +206,7 @@ private void initialize() {
this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider,
queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec());

RedisquesHttpRequestHandler.init(vertx, configuration, this);
RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.util.HandlerUtil;
import org.swisspush.redisques.util.QueueStatisticsCollector;

Expand All @@ -20,6 +23,7 @@
*/
public class GetQueuesStatisticsHandler implements Handler<AsyncResult<Response>> {

private static final Logger log = LoggerFactory.getLogger(GetQueuesStatisticsHandler.class);
private final Message<JsonObject> event;
private final Optional<Pattern> filterPattern;
private final QueueStatisticsCollector queueStatisticsCollector;
Expand All @@ -36,8 +40,13 @@ public GetQueuesStatisticsHandler(Message<JsonObject> event,
public void handle(AsyncResult<Response> handleQueues) {
if (handleQueues.succeeded()) {
List<String> queues = HandlerUtil
.filterByPattern(handleQueues.result(), filterPattern);
queueStatisticsCollector.getQueueStatistics(event, queues);
.filterByPattern(handleQueues.result(), filterPattern);
queueStatisticsCollector.getQueueStatistics(queues)
.onFailure(ex -> {
log.error("", ex);
event.reply(new JsonObject().put(STATUS, ERROR));
})
.onSuccess(event::reply);
} else {
event.reply(new JsonObject().put(STATUS, ERROR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import io.netty.util.internal.StringUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
Expand All @@ -18,22 +21,69 @@
import io.vertx.ext.web.handler.BasicAuthHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.RedisQues;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.Result;
import org.swisspush.redisques.util.StatusCode;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.swisspush.redisques.util.HttpServerRequestUtil.*;
import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.HttpServerRequestUtil.decode;
import static org.swisspush.redisques.util.HttpServerRequestUtil.encodePayload;
import static org.swisspush.redisques.util.HttpServerRequestUtil.evaluateUrlParameterToBeEmptyOrTrue;
import static org.swisspush.redisques.util.HttpServerRequestUtil.extractNonEmptyJsonArrayFromBody;
import static org.swisspush.redisques.util.RedisquesAPI.BAD_INPUT;
import static org.swisspush.redisques.util.RedisquesAPI.COUNT;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR_TYPE;
import static org.swisspush.redisques.util.RedisquesAPI.FILTER;
import static org.swisspush.redisques.util.RedisquesAPI.LIMIT;
import static org.swisspush.redisques.util.RedisquesAPI.LOCKS;
import static org.swisspush.redisques.util.RedisquesAPI.MEMORY_FULL;
import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE;
import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_DEQUEUESTATISTIC;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;
import static org.swisspush.redisques.util.RedisquesAPI.VALUE;
import static org.swisspush.redisques.util.RedisquesAPI.buildAddQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteQueuesOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkPutLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetAllLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetConfigurationOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesSpeedOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesStatisticsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildPutLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildReplaceQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildSetConfigurationOperation;

/**
* Handler class for HTTP requests providing access to Redisques over HTTP.
Expand Down Expand Up @@ -63,13 +113,13 @@ public class RedisquesHttpRequestHandler implements Handler<HttpServerRequest> {
private final String userHeader;
private final boolean enableQueueNameDecoding;
private final int queueSpeedIntervalSec;
private final RedisQues redisQues;
private final QueueStatisticsCollector queueStatisticsCollector;

public static void init(Vertx vertx, RedisquesConfiguration modConfig, RedisQues redisQues) {
public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) {
log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled());
if (modConfig.getHttpRequestHandlerEnabled()) {
if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) {
RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, redisQues);
RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector);
// in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default.
HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true);
vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> {
Expand Down Expand Up @@ -98,14 +148,14 @@ private Result<Boolean, String> checkHttpAuthenticationConfiguration(RedisquesCo
return Result.ok(false);
}

private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, RedisQues redisQues) {
private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) {
this.router = Router.router(vertx);
this.eventBus = vertx.eventBus();
this.redisquesAddress = modConfig.getAddress();
this.userHeader = modConfig.getHttpRequestHandlerUserHeader();
this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding();
this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec();
this.redisQues = redisQues;
this.queueStatisticsCollector = queueStatisticsCollector;

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down Expand Up @@ -512,12 +562,13 @@ private void getMonitorInformation(RoutingContext ctx) {
if (limit > 0) {
queuesList = limitJsonQueueArray(queuesList, limit);
}
Map<String, RedisQues.DequeueStatistic> dequeueProcessStatistic = redisQues.getDequeueStatistic();
fillStatisticToQueuesList(queuesList, dequeueProcessStatistic);

JsonObject resultObject = new JsonObject();
resultObject.put(QUEUES, queuesList);
jsonResponse(ctx.response(), resultObject);
// this function always Success
fillStatisticToQueuesList(queuesList).onSuccess(updatedQueuesList -> {
JsonObject resultObject = new JsonObject();
resultObject.put(QUEUES, updatedQueuesList);
jsonResponse(ctx.response(), resultObject);
});
} else {
// there was no result, we as well return an empty result
JsonObject resultObject = new JsonObject();
Expand All @@ -532,25 +583,53 @@ private void getMonitorInformation(RoutingContext ctx) {
});
}

private void fillStatisticToQueuesList(List<JsonObject> queuesList, Map<String, RedisQues.DequeueStatistic> dequeueProcessStatistic) {
queuesList.forEach(entries -> {
String queueName = entries.getString(MONITOR_QUEUE_NAME);
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, "");
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, "");
entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, "");
if (dequeueProcessStatistic.containsKey(queueName)) {
RedisQues.DequeueStatistic dequeueStatistic = dequeueProcessStatistic.get(queueName);
if (dequeueStatistic.lastDequeueAttemptTimestamp != null) {
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp)));
}
if (dequeueStatistic.lastDequeueSuccessTimestamp != null) {
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp)));
}
if (dequeueStatistic.nextDequeueDueTimestamp != null) {
entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp)));
}
}
});
private Future<List<JsonObject>> fillStatisticToQueuesList(List<JsonObject> queuesList) {
Promise<List<JsonObject>> promise = Promise.promise();
List<String> queueNameList = new ArrayList<>();
for (JsonObject jsonObject : queuesList)
{
queueNameList.add(jsonObject.getString(MONITOR_QUEUE_NAME));
}

queueStatisticsCollector.getQueueStatistics(queueNameList)
.onFailure(ex -> {
log.error("Failed to fetch QueueStatistics for queue", ex);
promise.complete(queuesList);
})
.onSuccess(queueStatisticsJsonObject -> {
if (OK.equals(queueStatisticsJsonObject.getString(STATUS))
&& !queueStatisticsJsonObject.getJsonArray(QUEUES).isEmpty()) {
JsonArray queueStatisticsArray = queueStatisticsJsonObject.getJsonArray(QUEUES);
queuesList.forEach(entries -> {
String queueName = entries.getString(MONITOR_QUEUE_NAME);
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, "");
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, "");
entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, "");
queueStatisticsJsonObject.getJsonArray(QUEUES);

for (Iterator<Object> it = queueStatisticsArray.stream().iterator(); it.hasNext(); ) {
JsonObject queueStatistic = (JsonObject) it.next();
if (queueName.equals(queueStatistic.getString(MONITOR_QUEUE_NAME))
&& queueStatistic.containsKey(STATISTIC_QUEUE_DEQUEUESTATISTIC)
&& queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC) != null) {
DequeueStatistic dequeueStatistic = queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC).mapTo(DequeueStatistic.class);
if (dequeueStatistic.lastDequeueAttemptTimestamp != null) {
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp)));
}
if (dequeueStatistic.lastDequeueSuccessTimestamp != null) {
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp)));
}
if (dequeueStatistic.nextDequeueDueTimestamp != null) {
entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp)));
}
break;
}
}
});
}
promise.complete(queuesList);
});
return promise.future();
}


Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/swisspush/redisques/util/DequeueStatistic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.swisspush.redisques.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class DequeueStatistic {
public Long lastDequeueAttemptTimestamp = null;
public Long lastDequeueSuccessTimestamp = null;
public Long nextDequeueDueTimestamp = null;

public boolean isEmpty() {
return lastDequeueAttemptTimestamp == null && lastDequeueSuccessTimestamp == null && nextDequeueDueTimestamp == null;
}
}
Loading

0 comments on commit d328600

Please sign in to comment.