diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 5dafc34..cec5d69 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -332,7 +332,7 @@ public void start(Promise promise) { consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; locksKey = modConfig.getRedisPrefix() + "locks"; queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec"; - consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive + consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive timer = new RedisQuesTimer(vertx); if (redisProvider == null) { diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 20a11b8..6a4ea3b 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -26,6 +26,7 @@ public class RedisquesConfiguration { private final String metricStorageName; private final int metricRefreshPeriod; private final int refreshPeriod; + private final int consumerLockMultiplier; private final List redisHosts; private final List redisPorts; private boolean redisEnableTls; @@ -66,6 +67,7 @@ public class RedisquesConfiguration { private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0; private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30; private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000; + private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2; // We want to have more than the default of 24 max waiting requests and therefore // set the default here to infinity value. See as well: @@ -88,6 +90,7 @@ public class RedisquesConfiguration { public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name"; public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period"; public static final String PROP_REFRESH_PERIOD = "refresh-period"; + public static final String PROP_CONSUMERLOCKMULTIPLIER = "consumer-lock-multiplier"; public static final String PROP_REDIS_HOST = "redisHost"; public static final String PROP_REDIS_HOST_LIST = "redisHosts"; public static final String PROP_REDIS_PORT = "redisPort"; @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, + processorDelayMax, httpRequestHandlerEnabled, + httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, + enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, + DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, + DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC, + DEFAULT_REDIS_READY_CHECK_INTERVAL_MS); + } + + /** + * Constructor with username and password (Redis ACL) + */ + public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, + String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, + int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword, + String redisUser, boolean redisEnableTls, int checkInterval, + int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, + boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + String httpRequestHandlerUsername, String httpRequestHandlerPassword, + Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, + List queueConfigurations, boolean enableQueueNameDecoding) { + this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, + metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort), redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, - List redisHosts, List redisPorts, RedisClientType redisClientType, + int consumerLockMultiplier, List redisHosts, List redisPorts, RedisClientType redisClientType, String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres this.processorAddress = processorAddress; this.publishMetricsAddress = publishMetricsAddress; this.refreshPeriod = refreshPeriod; + this.consumerLockMultiplier = consumerLockMultiplier; this.redisHosts = redisHosts; this.redisPorts = redisPorts; this.redisClientType = redisClientType; @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() { private RedisquesConfiguration(RedisquesConfigurationBuilder builder) { this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix, builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod, - builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, + builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval, builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled, builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix, @@ -354,6 +383,7 @@ public JsonObject asJsonObject() { obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName()); obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod()); obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod()); + obj.put(PROP_CONSUMERLOCKMULTIPLIER, getConsumerLockMultiplier()); obj.put(PROP_REDIS_HOST, getRedisHost()); obj.put(PROP_REDIS_HOST_LIST, getRedisHosts()); obj.put(PROP_REDIS_PORT, getRedisPort()); @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_REFRESH_PERIOD)) { builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD)); } + if (json.containsKey(PROP_CONSUMERLOCKMULTIPLIER)) { + builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER)); + } if (json.containsKey(PROP_REDIS_HOST)) { builder.redisHost(json.getString(PROP_REDIS_HOST)); } @@ -549,6 +582,10 @@ public int getRefreshPeriod() { return refreshPeriod; } + public int getConsumerLockMultiplier() { + return consumerLockMultiplier; + } + public String getRedisHost() { return redisHosts.get(0); } @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder { private String metricStorageName; private int metricRefreshPeriod; private int refreshPeriod; + private int consumerLockMultiplier; private List redisHosts; private List redisPorts; private boolean redisEnableTls; @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() { this.processorAddress = "redisques-processor"; this.metricRefreshPeriod = 10; this.refreshPeriod = 10; + this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER; this.redisHosts = Collections.singletonList("localhost"); this.redisPorts = Collections.singletonList(6379); this.redisEnableTls = false; @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) { return this; } + public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) { + this.consumerLockMultiplier = consumerLockMultiplier; + return this; + } + public RedisquesConfigurationBuilder redisHost(String redisHost) { this.redisHosts = Collections.singletonList(redisHost); return this; diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index e9790f2..9cd2da9 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -30,6 +30,7 @@ public void testDefaultConfiguration(TestContext testContext) { testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); testContext.assertEquals(config.getRedisEnableTls(), false); @@ -77,6 +78,7 @@ public void testOverrideConfiguration(TestContext testContext) { new QueueConfiguration().withPattern("vehicle-.*").withRetryIntervals(10, 20, 30, 60) )) .queueSpeedIntervalSec(1) + .consumerLockMultiplier(9) .memoryUsageLimitPercent(80) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") @@ -110,6 +112,7 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertEquals(config.getMemoryUsageLimitPercent(), 80); testContext.assertEquals(config.getPublishMetricsAddress(), "eventbus-addr-1"); testContext.assertEquals(config.getMetricStorageName(), "queue"); + testContext.assertEquals(config.getConsumerLockMultiplier(), 9); // queue configurations testContext.assertEquals(config.getQueueConfigurations().size(), 1); QueueConfiguration queueConfiguration = config.getQueueConfigurations().get(0); @@ -128,6 +131,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_PROCESSOR_ADDRESS), "redisques-processor"); testContext.assertEquals(json.getInteger(PROP_METRIC_REFRESH_PERIOD), 10); testContext.assertEquals(json.getInteger(PROP_REFRESH_PERIOD), 10); + testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 2); testContext.assertEquals(json.getString(PROP_REDIS_HOST), "localhost"); testContext.assertEquals(json.getInteger(PROP_REDIS_PORT), 6379); testContext.assertFalse(json.getBoolean(PROP_REDIS_ENABLE_TLS)); @@ -178,6 +182,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { .memoryUsageLimitPercent(55) .dequeueStatisticReportIntervalSec(44) .redisReadyCheckIntervalMs(5000) + .consumerLockMultiplier(3) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") .build(); @@ -214,6 +219,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getInteger(PROP_REDIS_READY_CHECK_INTERVAL_MS), 5000); testContext.assertEquals(json.getString(PROP_PUBLISH_METRICS_ADDRESS), "eventbus-addr-1"); testContext.assertEquals(json.getString(PROP_METRIC_STORAGE_NAME), "queue"); + testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 3); // queue configurations JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS); List queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList(); @@ -233,6 +239,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "redisques:"); testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); @@ -267,6 +274,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_REDIS_PREFIX, "new_redis-prefix"); json.put(PROP_PROCESSOR_ADDRESS, "new_processor-address"); json.put(PROP_REFRESH_PERIOD, 99); + json.put(PROP_CONSUMERLOCKMULTIPLIER, 91); json.put(PROP_METRIC_REFRESH_PERIOD, 55); json.put(PROP_REDIS_HOST, "newredishost"); json.put(PROP_REDIS_PORT, 4321); @@ -300,6 +308,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "new_redis-prefix"); testContext.assertEquals(config.getProcessorAddress(), "new_processor-address"); testContext.assertEquals(config.getRefreshPeriod(), 99); + testContext.assertEquals(config.getConsumerLockMultiplier(), 91); testContext.assertEquals(config.getMetricRefreshPeriod(), 55); testContext.assertEquals(config.getRedisHost(), "newredishost"); testContext.assertEquals(config.getRedisPort(), 4321);