Skip to content

Commit

Permalink
Display Completed Jobs (#109)
Browse files Browse the repository at this point in the history
* class and variables renamed to schedule* from delay*
* display complete jobs in dashboard
* fix message move message count #105
* Fixed #112
* * doc updated
* README updates
* Potential cross slot error in rename
* remove doble tagging
* fixes for url web prefix
* added filter to select number of days/months/weeks to select data
  • Loading branch information
sonus21 authored Oct 10, 2021
1 parent 2fdbbe7 commit d5520ae
Show file tree
Hide file tree
Showing 187 changed files with 2,807 additions and 1,559 deletions.
63 changes: 0 additions & 63 deletions .travis.yml

This file was deleted.

30 changes: 24 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
# [Rqueue] New and Notable Changes

### [2.10.0] - 10-Oct-2021

### Fixes

* Fixes for post processor calls (post processor calls were not made)
* Fixes message move message count (by default 1000 messages are moved)
* Potential issue in rename collection
* More than one (-) sign in the dashboard
* Fixes for server context path. Rqueue end points would be served relative to x-forwarded-prefix/server.servlet.context-path

### Features

* Display completed jobs in the dashboard
* Option to choose number of days in the chart
ReactiveWebViewTest
### [2.9.0] - 30-Jul-2021

### Fixes
* Option to add rqueue web url prefix, the prefix is configured from application.properties file using
`rqueue.web.url.prefix=/my-application/`, now rqueue dashboard would be served
at `/my-application/rquque` instead of `/rqueue`, the configuration has higher priority than the
HTTP request header `x-forwarded-prefix`.


* Option to add rqueue web url prefix, the prefix is configured from application.properties file
using `rqueue.web.url.prefix=/my-application/`, now rqueue dashboard would be served
at `/my-application/rquque` instead of `/rqueue`, the configuration has higher priority than the
HTTP request header `x-forwarded-prefix`.
* Custom message converter is not working
* RedisCommandExecutionException : command arguments must be strings or integers

Expand Down Expand Up @@ -196,7 +212,7 @@ Fixes:
### Fixed

* An issue in the scheduler that's always scheduling job at the delay of 5 seconds. (this leads to
messages are not copied from delayed queue to main queue on high load)
messages are not copied from scheduled queue to main queue on high load)

## [1.2] - 03-Nov-2019

Expand Down Expand Up @@ -255,3 +271,5 @@ Fixes:
[2.8.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.8.0-RELEASE

[2.9.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.9.0-RELEASE

[2.10.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.0-RELEASE
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<div>
<img align="left" src="https://raw.githubusercontent.com/sonus21/rqueue/master/rqueue-core/src/main/resources/public/rqueue/img/android-chrome-192x192.png" alt="Rqueue Logo" width="90">
<h1 style="float:left">Rqueue: Redis Queue, Task Queue, Delayed Queue for Spring and Spring Boot</h1>
<h1 style="float:left">Rqueue: Redis Queue, Task Queue, Scheduled Queue for Spring and Spring Boot</h1>
</div>

[![Build Status](https://circleci.com/gh/sonus21/rqueue/tree/master.svg?style=shield)](https://circleci.com/gh/sonus21/rqueue/tree/master)
Expand Down Expand Up @@ -32,7 +32,7 @@ well, where all services code is in Spring.
* **Automatic message serialization and deserialization**
* **Message Multicasting** : Call multiple message listeners on every message
* **Batch Message Polling** : Fetch multiple messages from Redis at once
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Metrics** : In flight messages, waiting for consumption and scheduled messages
* **Competing Consumers** : multiple messages can be consumed in parallel by different
workers/listeners.
* **Concurrency** : Concurrency of any listener can be configured
Expand Down Expand Up @@ -71,14 +71,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
* Add dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.9.0-RELEASE'
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring-boot-starter</artifactId>
<version>2.9.0-RELEASE</version>
<version>2.10.0-RELEASE</version>
</dependency>
```

Expand All @@ -91,14 +91,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
* Add Dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring:2.9.0-RELEASE'
implementation 'com.github.sonus21:rqueue-spring:2.10.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring</artifactId>
<version>2.9.0-RELEASE</version>
<version>2.10.0-RELEASE</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.9.0-RELEASE'
version = '2.10.0-RELEASE'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
2 changes: 1 addition & 1 deletion rqueue-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ext.projectDescription = 'Asynchronous and delayed task executor'
ext.projectDescription = 'Asynchronous and scheduled task executor'
ext.name = 'Rqueue Core Java'
buildscript {
apply from: "${rootDir}/gradle/code-signing.gradle"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
* public class MessageListener {
* &#64;RqueueListener(
* value="${job.queue}",
* delayedQueue="true",
* numRetries="3",
* deadLetterQueue="#{job.dead.letter.queue}",
* visibilityTimeout="30*60*1000")
Expand All @@ -62,16 +61,6 @@
*/
String[] value() default {};

/**
* All queues are considered delayed for fast recovery, it can not be disabled even by setting
* this to false.
*
* @deprecated since 2.0
* @return whether it's delayed queue or not.
*/
@Deprecated
String delayedQueue() default "true";

/**
* Number of times a message should be retried before it can be discarded or send it to dead
* letter queue in case of consecutive failures. This is a global value for a consumer, each
Expand Down Expand Up @@ -99,8 +88,9 @@
String deadLetterQueue() default "";

/**
* By default messages sent to dead letter queue are not consumable by listener. This flag is used
* to turn on the consumable feature of dead letter queue.
* By default, messages sent to dead letter queue are not consumable by listener. This flag is
* used to turn on the consumable feature of dead letter queue. If this is set to true then
* application should add message listener for the dead letter queue.
*
* @return true/false
*/
Expand Down Expand Up @@ -128,7 +118,7 @@
* listeners, that will violate exactly once processing. On the other hand if provided time is too
* high then the message would be hidden from other consumers for a longer period.
*
* <p><b>NOTE:</b> This time is in milli seconds
* <p><b>NOTE:</b> This time is in milliseconds
*
* @return visibilityTimeout visibility timeout
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public Set<TypedTuple<V>> zrangeWithScore(String key, long start, long end) {
return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
}

public void zremRangeByScore(String key, long min, long max) {
redisTemplate.opsForZSet().removeRangeByScore(key, min, max);
}

public Set<V> getMembers(String key) {
return redisTemplate.opsForSet().members(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
@Configuration
public class RqueueConfig {

private static final String brokerId = UUID.randomUUID().toString();
private static final AtomicLong counter = new AtomicLong(1);
private final RedisConnectionFactory connectionFactory;
private final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
private final boolean sharedConnection;
private final int dbVersion;
private static final String brokerId = UUID.randomUUID().toString();
private static final AtomicLong counter = new AtomicLong(1);

@Value("${rqueue.reactive.enabled:false}")
private boolean reactiveEnabled;
Expand Down Expand Up @@ -83,11 +83,14 @@ public class RqueueConfig {
@Value("${rqueue.simple.queue.prefix:}")
private String simpleQueuePrefix;

@Value("${rqueue.delayed.queue.prefix:}")
private String delayedQueuePrefix;
@Value("${rqueue.scheduled.queue.prefix:}")
private String scheduledQueuePrefix;

@Value("${rqueue.delayed.queue.channel.prefix:}")
private String delayedQueueChannelPrefix;
@Value("${rqueue.completed.queue.prefix:}")
private String completedQueuePrefix;

@Value("${rqueue.scheduled.queue.channel.prefix:}")
private String scheduledQueueChannelPrefix;

@Value("${rqueue.processing.queue.name.prefix:}")
private String processingQueuePrefix;
Expand Down Expand Up @@ -139,6 +142,21 @@ public class RqueueConfig {
@Value("${rqueue.internal.communication.channel.name.prefix:i-channel}")
private String internalChannelNamePrefix;

@Value("${rqueue.completed.job.cleanup.interval:30000}")
private long completedJobCleanupIntervalInMs;

public static String getBrokerId() {
return brokerId;
}

public boolean messageInTerminalStateShouldBeStored() {
return getMessageDurabilityInTerminalStateInSecond() > 0;
}

public long messageDurabilityInTerminalStateInMillisecond() {
return getMessageDurabilityInTerminalStateInSecond() * Constants.ONE_MILLI;
}

public String getInternalCommChannelName() {
return prefix + internalChannelNamePrefix;
}
Expand All @@ -157,19 +175,26 @@ private String getSimpleQueueSuffix() {
return "queue-v2::";
}

private String getDelayedQueueSuffix() {
if (!StringUtils.isEmpty(delayedQueuePrefix)) {
return delayedQueuePrefix;
private String getScheduledQueueSuffix() {
if (!StringUtils.isEmpty(scheduledQueuePrefix)) {
return scheduledQueuePrefix;
}
if (dbVersion == 2) {
return "d-queue::";
}
return "d-queue-v2::";
}

private String getDelayedQueueChannelSuffix() {
if (!StringUtils.isEmpty(delayedQueueChannelPrefix)) {
return delayedQueueChannelPrefix;
private String getCompletedQueueSuffix() {
if (!StringUtils.isEmpty(completedQueuePrefix)) {
return completedQueuePrefix;
}
return "c-queue::";
}

private String getScheduledQueueChannelSuffix() {
if (!StringUtils.isEmpty(scheduledQueueChannelPrefix)) {
return scheduledQueueChannelPrefix;
}
if (dbVersion == 2) {
return "d-channel::";
Expand Down Expand Up @@ -204,18 +229,22 @@ public String getQueueName(String queueName) {
return prefix + getSimpleQueueSuffix() + getTaggedName(queueName);
}

public String getDelayedQueueName(String queueName) {
public String getCompletedQueueName(String queueName) {
return prefix + getCompletedQueueSuffix() + getTaggedName(queueName);
}

public String getScheduledQueueName(String queueName) {
if (dbVersion == 1) {
return "rqueue-delay::" + queueName;
}
return prefix + getDelayedQueueSuffix() + getTaggedName(queueName);
return prefix + getScheduledQueueSuffix() + getTaggedName(queueName);
}

public String getDelayedQueueChannelName(String queueName) {
public String getScheduledQueueChannelName(String queueName) {
if (dbVersion == 1) {
return "rqueue-channel::" + queueName;
}
return prefix + getDelayedQueueChannelSuffix() + getTaggedName(queueName);
return prefix + getScheduledQueueChannelSuffix() + getTaggedName(queueName);
}

public String getProcessingQueueName(String queueName) {
Expand Down Expand Up @@ -248,6 +277,18 @@ private String getTaggedName(String queueName) {
if (!clusterMode) {
return queueName;
}
boolean left = false;
boolean right = false;
for (Character c : queueName.toCharArray()) {
if (c == '{') {
left = true;
} else if (c == '}') {
right = true;
}
}
if (left && right) {
return queueName;
}
return "{" + queueName + "}";
}

Expand All @@ -259,22 +300,19 @@ public String getJobsKey(String messageId) {
return prefix + jobsCollectionNamePrefix + messageId;
}

public String getDelDataName() {
public String getDelDataName(String queueName) {
return prefix
+ delPrefix
+ brokerId
+ Constants.REDIS_KEY_SEPARATOR
+ getTaggedName(queueName)
+ counter.incrementAndGet();
}

public Duration getJobDurabilityInTerminalState() {
return Duration.ofSeconds(jobDurabilityInTerminalStateInSecond);
}

public static String getBrokerId() {
return brokerId;
}

public String getLibVersion() {
if (StringUtils.isEmpty(version)) {
ClassPathResource resource =
Expand Down
Loading

0 comments on commit d5520ae

Please sign in to comment.