From fd4b535dd69e0ee854a9ca6fdb1d2b8e01c1a703 Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 2 Dec 2024 14:36:32 +0000 Subject: [PATCH 01/10] updating poms for branch'release-4.1.9' with non-snapshot versions --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96064ba..7634f4b 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.9-SNAPSHOT + 4.1.9 redisques A highly scalable redis-persistent queuing system for vertx From 7814595308e67b2654cff0d8400c2449ae62277f Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 2 Dec 2024 14:36:32 +0000 Subject: [PATCH 02/10] updating poms for 4.1.10-SNAPSHOT development --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96064ba..fbb9e96 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.9-SNAPSHOT + 4.1.10-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From 7449b94d5cfb67fedee3c04870097ba65e429b48 Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 2 Dec 2024 14:39:04 +0000 Subject: [PATCH 03/10] updating develop poms to master versions to avoid merge conflicts --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fbb9e96..7634f4b 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.10-SNAPSHOT + 4.1.9 redisques A highly scalable redis-persistent queuing system for vertx From 447c105eac0aadde52fb6a5741bddd78f7c1ea56 Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 2 Dec 2024 14:39:04 +0000 Subject: [PATCH 04/10] Updating develop poms back to pre merge state --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7634f4b..fbb9e96 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.9 + 4.1.10-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From 7f61e229c0ab2f59ded11bc3df798c586b786c7d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 14:03:26 +0700 Subject: [PATCH 05/10] remove consumer if it is not exist --- .../org/swisspush/redisques/RedisQues.java | 72 +++++++++++++++---- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 0091990..c2fb8f3 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -209,6 +209,8 @@ private enum QueueState { private MessageConsumer consumersMessageConsumer; + private MessageConsumer consumersAliveMessageConsumer; + // Configuration private RedisProvider redisProvider; @@ -234,6 +236,7 @@ private enum QueueState { private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); + private Map aliveConsumer = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; @@ -453,6 +456,8 @@ private void initialize() { // Handles registration requests consumersMessageConsumer = vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest); + consumersAliveMessageConsumer = vertx.eventBus().consumer(address + "-consumer-alive", this::handleConsumerAlive); + // Handles notifications uidMessageConsumer = vertx.eventBus().consumer(uid, event -> { final String queue = event.body(); @@ -468,8 +473,38 @@ private void initialize() { registerQueueCheck(); registerMetricsGathering(configuration); registerNotExpiredQueueCheck(); + registerKeepConsumerAlive(); + } + + private void registerKeepConsumerAlive() { + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + final String address = configurationProvider.configuration().getAddress(); + vertx.setPeriodic(10, periodMs, event -> { + Iterator> iterator = aliveConsumer.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTimeMillis() > entry.getValue()) { + log.info("RedisQues consumer with id {}' has expired", entry.getKey()); + iterator.remove(); + } + } + log.debug("RedisQues consumer {} keep alive published", uid); + vertx.eventBus().publish(address + "-consumer-alive", uid); + }); + } + + private void handleConsumerAlive(Message msg) { + final String consumerId = msg.body(); + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + if (!UUID.fromString(consumerId).toString().equals(consumerId)) { + log.warn("invalid RedisQues consumer id {}", consumerId); + return; + } + log.debug("RedisQues consumer {} keep alive renewed", consumerId); + aliveConsumer.put(consumerId, currentTimeMillis() + (periodMs * 4)); } + private void registerNotExpiredQueueCheck() { vertx.setPeriodic(20 * 1000, event -> { if (!log.isDebugEnabled()) { @@ -784,16 +819,20 @@ public void stop() { private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( - "unregister(" + event + ") failed", event.cause())); - unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if( unregisterEvent.failed() ) { - log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); - } - stoppedHandler = doneHandler; - if (myQueues.keySet().isEmpty()) { - doneHandler.handle(null); - } + "unregister(" + event + ") failed", event.cause())); + consumersAliveMessageConsumer.unregister(event1 -> { + if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( + "unregister(" + event1 + ") failed", event1.cause())); + unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } + stoppedHandler = doneHandler; + if (myQueues.keySet().isEmpty()) { + doneHandler.handle(null); + } + }); }); })); } @@ -1166,10 +1205,15 @@ private Future notifyConsumer(final String queueName) { eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { - // Notify the registered consumer - log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); - eb.send(consumer, queueName); - promise.complete(); + if (!aliveConsumer.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} is not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); + } else { + // Notify the registered consumer + log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); + eb.send(consumer, queueName); + promise.complete(); + } } })) .onFailure(throwable -> { From 8bcd92cb5ce741d126641f01b6c0212694e4f33a Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 15:31:41 +0700 Subject: [PATCH 06/10] renamed --- src/main/java/org/swisspush/redisques/RedisQues.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c2fb8f3..9d76771 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -236,7 +236,7 @@ private enum QueueState { private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); - private Map aliveConsumer = new ConcurrentHashMap<>(); + private Map aliveConsumers = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; @@ -480,7 +480,7 @@ private void registerKeepConsumerAlive() { final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; final String address = configurationProvider.configuration().getAddress(); vertx.setPeriodic(10, periodMs, event -> { - Iterator> iterator = aliveConsumer.entrySet().iterator(); + Iterator> iterator = aliveConsumers.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); if (currentTimeMillis() > entry.getValue()) { @@ -488,8 +488,8 @@ private void registerKeepConsumerAlive() { iterator.remove(); } } - log.debug("RedisQues consumer {} keep alive published", uid); vertx.eventBus().publish(address + "-consumer-alive", uid); + log.debug("RedisQues consumer {} keep alive published", uid); }); } @@ -501,7 +501,7 @@ private void handleConsumerAlive(Message msg) { return; } log.debug("RedisQues consumer {} keep alive renewed", consumerId); - aliveConsumer.put(consumerId, currentTimeMillis() + (periodMs * 4)); + aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4)); } @@ -1205,8 +1205,8 @@ private Future notifyConsumer(final String queueName) { eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { - if (!aliveConsumer.containsKey(consumer)) { - log.warn("RedisQues consumer {} of queue {} is not exist.", consumer, queueName); + if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); } else { // Notify the registered consumer From 71aa2ea77822d95b52f558e1d2ede50a869ad478 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 16:37:49 +0700 Subject: [PATCH 07/10] removed unused check --- src/main/java/org/swisspush/redisques/RedisQues.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 9d76771..8d7388a 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -496,12 +496,8 @@ private void registerKeepConsumerAlive() { private void handleConsumerAlive(Message msg) { final String consumerId = msg.body(); final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; - if (!UUID.fromString(consumerId).toString().equals(consumerId)) { - log.warn("invalid RedisQues consumer id {}", consumerId); - return; - } - log.debug("RedisQues consumer {} keep alive renewed", consumerId); aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4)); + log.debug("RedisQues consumer {} keep alive renewed", consumerId); } From 78efaacd297021ca676341c3f0e919ccf4803b93 Mon Sep 17 00:00:00 2001 From: "Andreas Fankhauser hiddenalpha.ch" <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 10 Dec 2024 12:19:11 +0100 Subject: [PATCH 08/10] SDCISA-18302: Add logging in case unreachable code gets reached. --- .../redisques/performance/UpperBoundParallel.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index d0a8aa1..5a0b813 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -132,7 +132,12 @@ private void resume(Request req) { // this boolean is just for paranoia, in case mentor tries to call back too often. final AtomicBoolean isCalled = new AtomicBoolean(); @Override public void accept(Throwable ex, Void ret) { - if (!isCalled.compareAndSet(false, true)) return; + if (!isCalled.compareAndSet(false, true)) { + boolean d = log.isDebugEnabled(); + log.error("This callback MUST NOT be called multiple times!! Make sure caller only calls it ONCE!{}", + d ? "" : " (enable debug log to see stack)", d ? new Exception("here a stack for you") : (Exception) null); + return; + } onOneDone(req, ex); } }, req.ctx); From fe3b03a45cf55e46745e5fd88041d8c3ecef1c2d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Wed, 11 Dec 2024 15:09:47 +0700 Subject: [PATCH 09/10] code improved from feedback --- .../org/swisspush/redisques/RedisQues.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8d7388a..c46be73 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -816,13 +816,17 @@ private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event + ") failed", event.cause())); + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } consumersAliveMessageConsumer.unregister(event1 -> { if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event1 + ") failed", event1.cause())); unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if (unregisterEvent.failed()) { + if (unregisterConsumersEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); + "unregisterConsumers(false) failed", unregisterConsumersEvent.cause())); } stoppedHandler = doneHandler; if (myQueues.keySet().isEmpty()) { @@ -1200,16 +1204,21 @@ private Future notifyConsumer(final String queueName) { log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); - } else { - if (!aliveConsumers.containsKey(consumer)) { - log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); - redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); - } else { - // Notify the registered consumer - log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); - eb.send(consumer, queueName); + } else if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), result -> { + if (result.failed()) { + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause())); + } else { + log.debug("{} consumer key removed", result.result().toLong()); + } promise.complete(); - } + }); + } else { + // Notify the registered consumer + log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); + eb.send(consumer, queueName); + promise.complete(); } })) .onFailure(throwable -> { From 9ae03c8537a118dea2c4e9721749a913655511ab Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Thu, 12 Dec 2024 13:48:54 +0700 Subject: [PATCH 10/10] fixed wrong cause has been logged --- src/main/java/org/swisspush/redisques/RedisQues.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c46be73..26d75f1 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1208,7 +1208,7 @@ private Future notifyConsumer(final String queueName) { log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); redisAPI.del(Collections.singletonList(key), result -> { if (result.failed()) { - log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause())); + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(result.cause())); } else { log.debug("{} consumer key removed", result.result().toLong()); }