Skip to content

Commit

Permalink
Merge pull request #232 from swisspost/develop
Browse files Browse the repository at this point in the history
PR-Release
  • Loading branch information
ZhengXinCN authored Dec 12, 2024
2 parents f3d355f + 345b217 commit 621f174
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>4.1.9-SNAPSHOT</version>
<version>4.1.10-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
69 changes: 59 additions & 10 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ private enum QueueState {

private MessageConsumer<String> consumersMessageConsumer;

private MessageConsumer<String> consumersAliveMessageConsumer;

// Configuration

private RedisProvider redisProvider;
Expand All @@ -234,6 +236,7 @@ private enum QueueState {
private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
private Map<String, Long> aliveConsumers = new ConcurrentHashMap<>();
private boolean dequeueStatisticEnabled = false;
private final RedisQuesExceptionFactory exceptionFactory;
private PeriodicSkipScheduler periodicSkipScheduler;
Expand Down Expand Up @@ -453,6 +456,8 @@ private void initialize() {
// Handles registration requests
consumersMessageConsumer = vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest);

consumersAliveMessageConsumer = vertx.eventBus().consumer(address + "-consumer-alive", this::handleConsumerAlive);

// Handles notifications
uidMessageConsumer = vertx.eventBus().consumer(uid, event -> {
final String queue = event.body();
Expand All @@ -468,8 +473,34 @@ private void initialize() {
registerQueueCheck();
registerMetricsGathering(configuration);
registerNotExpiredQueueCheck();
registerKeepConsumerAlive();
}

private void registerKeepConsumerAlive() {
final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L;
final String address = configurationProvider.configuration().getAddress();
vertx.setPeriodic(10, periodMs, event -> {
Iterator<Map.Entry<String, Long>> iterator = aliveConsumers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (currentTimeMillis() > entry.getValue()) {
log.info("RedisQues consumer with id {}' has expired", entry.getKey());
iterator.remove();
}
}
vertx.eventBus().publish(address + "-consumer-alive", uid);
log.debug("RedisQues consumer {} keep alive published", uid);
});
}

private void handleConsumerAlive(Message<String> msg) {
final String consumerId = msg.body();
final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L;
aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4));
log.debug("RedisQues consumer {} keep alive renewed", consumerId);
}


private void registerNotExpiredQueueCheck() {
vertx.setPeriodic(20 * 1000, event -> {
if (!log.isDebugEnabled()) {
Expand Down Expand Up @@ -784,16 +815,24 @@ public void stop() {
private void gracefulStop(final Handler<Void> doneHandler) {
consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> {
if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event + ") failed", event.cause()));
unregisterConsumers(false).onComplete(unregisterConsumersEvent -> {
if( unregisterEvent.failed() ) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
}
stoppedHandler = doneHandler;
if (myQueues.keySet().isEmpty()) {
doneHandler.handle(null);
}
"unregister(" + event + ") failed", event.cause()));
if (unregisterEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
}
consumersAliveMessageConsumer.unregister(event1 -> {
if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event1 + ") failed", event1.cause()));
unregisterConsumers(false).onComplete(unregisterConsumersEvent -> {
if (unregisterConsumersEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers(false) failed", unregisterConsumersEvent.cause()));
}
stoppedHandler = doneHandler;
if (myQueues.keySet().isEmpty()) {
doneHandler.handle(null);
}
});
});
}));
}
Expand Down Expand Up @@ -1165,6 +1204,16 @@ private Future<Void> notifyConsumer(final String queueName) {
log.debug("RedisQues Sending registration request for queue {}", queueName);
eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName);
promise.complete();
} else if (!aliveConsumers.containsKey(consumer)) {
log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName);
redisAPI.del(Collections.singletonList(key), result -> {
if (result.failed()) {
log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(result.cause()));
} else {
log.debug("{} consumer key removed", result.result().toLong());
}
promise.complete();
});
} else {
// Notify the registered consumer
log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ private <Ctx> void resume(Request<Ctx> req) {
// this boolean is just for paranoia, in case mentor tries to call back too often.
final AtomicBoolean isCalled = new AtomicBoolean();
@Override public void accept(Throwable ex, Void ret) {
if (!isCalled.compareAndSet(false, true)) return;
if (!isCalled.compareAndSet(false, true)) {
boolean d = log.isDebugEnabled();
log.error("This callback MUST NOT be called multiple times!! Make sure caller only calls it ONCE!{}",
d ? "" : " (enable debug log to see stack)", d ? new Exception("here a stack for you") : (Exception) null);
return;
}
onOneDone(req, ex);
}
}, req.ctx);
Expand Down

0 comments on commit 621f174

Please sign in to comment.