Skip to content

Commit

Permalink
allow ConsumerLock Multiplier to be configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Oct 26, 2024
1 parent 9363a50 commit 88eb524
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void start(Promise<Void> promise) {
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
locksKey = modConfig.getRedisPrefix() + "locks";
queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec";
consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
timer = new RedisQuesTimer(vertx);

if (redisProvider == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RedisquesConfiguration {
private final String metricStorageName;
private final int metricRefreshPeriod;
private final int refreshPeriod;
private final int consumerLockMultiplier;
private final List<String> redisHosts;
private final List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class RedisquesConfiguration {
private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0;
private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30;
private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000;
private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2;

// We want to have more than the default of 24 max waiting requests and therefore
// set the default here to infinity value. See as well:
Expand All @@ -88,6 +90,7 @@ public class RedisquesConfiguration {
public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name";
public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period";
public static final String PROP_REFRESH_PERIOD = "refresh-period";
public static final String PROP_CONSUMERLOCKMULTIPLIER = "consumer-lock-multiplier";
public static final String PROP_REDIS_HOST = "redisHost";
public static final String PROP_REDIS_HOST_LIST = "redisHosts";
public static final String PROP_REDIS_PORT = "redisPort";
Expand Down Expand Up @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations,
enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE,
DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT,
DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC,
DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC,
DEFAULT_REDIS_READY_CHECK_INTERVAL_MS);
}

/**
* Constructor with username and password (Redis ACL)
*/
public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword,
String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
String httpRequestHandlerUsername, String httpRequestHandlerPassword,
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress

private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
int consumerLockMultiplier, List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
Expand All @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
this.processorAddress = processorAddress;
this.publishMetricsAddress = publishMetricsAddress;
this.refreshPeriod = refreshPeriod;
this.consumerLockMultiplier = consumerLockMultiplier;
this.redisHosts = redisHosts;
this.redisPorts = redisPorts;
this.redisClientType = redisClientType;
Expand Down Expand Up @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() {
private RedisquesConfiguration(RedisquesConfigurationBuilder builder) {
this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix,
builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod,
builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval,
builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled,
builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix,
Expand Down Expand Up @@ -354,6 +383,7 @@ public JsonObject asJsonObject() {
obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName());
obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod());
obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod());
obj.put(PROP_CONSUMERLOCKMULTIPLIER, getConsumerLockMultiplier());
obj.put(PROP_REDIS_HOST, getRedisHost());
obj.put(PROP_REDIS_HOST_LIST, getRedisHosts());
obj.put(PROP_REDIS_PORT, getRedisPort());
Expand Down Expand Up @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) {
if (json.containsKey(PROP_REFRESH_PERIOD)) {
builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD));
}
if (json.containsKey(PROP_CONSUMERLOCKMULTIPLIER)) {
builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER));
}
if (json.containsKey(PROP_REDIS_HOST)) {
builder.redisHost(json.getString(PROP_REDIS_HOST));
}
Expand Down Expand Up @@ -549,6 +582,10 @@ public int getRefreshPeriod() {
return refreshPeriod;
}

public int getConsumerLockMultiplier() {
return consumerLockMultiplier;
}

public String getRedisHost() {
return redisHosts.get(0);
}
Expand Down Expand Up @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder {
private String metricStorageName;
private int metricRefreshPeriod;
private int refreshPeriod;
private int consumerLockMultiplier;
private List<String> redisHosts;
private List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() {
this.processorAddress = "redisques-processor";
this.metricRefreshPeriod = 10;
this.refreshPeriod = 10;
this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER;
this.redisHosts = Collections.singletonList("localhost");
this.redisPorts = Collections.singletonList(6379);
this.redisEnableTls = false;
Expand Down Expand Up @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) {
return this;
}

public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) {
this.consumerLockMultiplier = consumerLockMultiplier;
return this;
}

public RedisquesConfigurationBuilder redisHost(String redisHost) {
this.redisHosts = Collections.singletonList(redisHost);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void testDefaultConfiguration(TestContext testContext) {
testContext.assertEquals(config.getProcessorAddress(), "redisques-processor");
testContext.assertEquals(config.getMetricRefreshPeriod(), 10);
testContext.assertEquals(config.getRefreshPeriod(), 10);
testContext.assertEquals(config.getConsumerLockMultiplier(), 2);
testContext.assertEquals(config.getRedisHost(), "localhost");
testContext.assertEquals(config.getRedisPort(), 6379);
testContext.assertEquals(config.getRedisEnableTls(), false);
Expand Down Expand Up @@ -77,6 +78,7 @@ public void testOverrideConfiguration(TestContext testContext) {
new QueueConfiguration().withPattern("vehicle-.*").withRetryIntervals(10, 20, 30, 60)
))
.queueSpeedIntervalSec(1)
.consumerLockMultiplier(9)
.memoryUsageLimitPercent(80)
.publishMetricsAddress("eventbus-addr-1")
.metricStorageName("queue")
Expand Down Expand Up @@ -110,6 +112,7 @@ public void testOverrideConfiguration(TestContext testContext) {
testContext.assertEquals(config.getMemoryUsageLimitPercent(), 80);
testContext.assertEquals(config.getPublishMetricsAddress(), "eventbus-addr-1");
testContext.assertEquals(config.getMetricStorageName(), "queue");
testContext.assertEquals(config.getConsumerLockMultiplier(), 9);
// queue configurations
testContext.assertEquals(config.getQueueConfigurations().size(), 1);
QueueConfiguration queueConfiguration = config.getQueueConfigurations().get(0);
Expand All @@ -128,6 +131,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) {
testContext.assertEquals(json.getString(PROP_PROCESSOR_ADDRESS), "redisques-processor");
testContext.assertEquals(json.getInteger(PROP_METRIC_REFRESH_PERIOD), 10);
testContext.assertEquals(json.getInteger(PROP_REFRESH_PERIOD), 10);
testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 2);
testContext.assertEquals(json.getString(PROP_REDIS_HOST), "localhost");
testContext.assertEquals(json.getInteger(PROP_REDIS_PORT), 6379);
testContext.assertFalse(json.getBoolean(PROP_REDIS_ENABLE_TLS));
Expand Down Expand Up @@ -178,6 +182,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) {
.memoryUsageLimitPercent(55)
.dequeueStatisticReportIntervalSec(44)
.redisReadyCheckIntervalMs(5000)
.consumerLockMultiplier(3)
.publishMetricsAddress("eventbus-addr-1")
.metricStorageName("queue")
.build();
Expand Down Expand Up @@ -214,6 +219,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) {
testContext.assertEquals(json.getInteger(PROP_REDIS_READY_CHECK_INTERVAL_MS), 5000);
testContext.assertEquals(json.getString(PROP_PUBLISH_METRICS_ADDRESS), "eventbus-addr-1");
testContext.assertEquals(json.getString(PROP_METRIC_STORAGE_NAME), "queue");
testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 3);
// queue configurations
JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS);
List<JsonObject> queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList();
Expand All @@ -233,6 +239,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) {
testContext.assertEquals(config.getRedisPrefix(), "redisques:");
testContext.assertEquals(config.getProcessorAddress(), "redisques-processor");
testContext.assertEquals(config.getRefreshPeriod(), 10);
testContext.assertEquals(config.getConsumerLockMultiplier(), 2);
testContext.assertEquals(config.getMetricRefreshPeriod(), 10);
testContext.assertEquals(config.getRedisHost(), "localhost");
testContext.assertEquals(config.getRedisPort(), 6379);
Expand Down Expand Up @@ -267,6 +274,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) {
json.put(PROP_REDIS_PREFIX, "new_redis-prefix");
json.put(PROP_PROCESSOR_ADDRESS, "new_processor-address");
json.put(PROP_REFRESH_PERIOD, 99);
json.put(PROP_CONSUMERLOCKMULTIPLIER, 91);
json.put(PROP_METRIC_REFRESH_PERIOD, 55);
json.put(PROP_REDIS_HOST, "newredishost");
json.put(PROP_REDIS_PORT, 4321);
Expand Down Expand Up @@ -300,6 +308,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) {
testContext.assertEquals(config.getRedisPrefix(), "new_redis-prefix");
testContext.assertEquals(config.getProcessorAddress(), "new_processor-address");
testContext.assertEquals(config.getRefreshPeriod(), 99);
testContext.assertEquals(config.getConsumerLockMultiplier(), 91);
testContext.assertEquals(config.getMetricRefreshPeriod(), 55);
testContext.assertEquals(config.getRedisHost(), "newredishost");
testContext.assertEquals(config.getRedisPort(), 4321);
Expand Down

0 comments on commit 88eb524

Please sign in to comment.