Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS JetStream support #2257

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha
- [In-Memory](#in-memory)
- [Memcached](#memcached-using-spymemcached)
- [Datastore](#datastore)
- [NATS JetStream](#jetstream)
+ [Multi-tenancy](#multi-tenancy)
+ [Customization](#customization)
+ [Duration specification](#duration-specification)
Expand Down Expand Up @@ -885,6 +886,49 @@ public LockProvider lockProvider(DatabaseClient databaseClient) {
}
```

#### JetStream
NatsJetStreamLockProvider has some limitations due to how NATS have implemented TTL, but its still useful for most usecases.

100ms is the smallest lock timing NATS support (currently NatsJetStreamLockProvider will silently increase the values below this to 100ms). Reaper timings of TTL expired locks cannot be configured in NATS currently, so timing is best effort.

TTL is currently defined on 'bucket' level, meaning a lockname is fixed to a TTL when first encountered. So avoid using the same lockname with different timing settings.

Dont do:
@SchedulerLock(name = "scheduledTaskName", lockAtMostFor = "5m")
.. (somewhere else)
@SchedulerLock(name = "scheduledTaskName", lockAtMostFor = "10m")

But instead you can do:
@SchedulerLock(name = "scheduledTaskName-5m", lockAtMostFor = "5m")
.. (somewhere else)
@SchedulerLock(name = "scheduledTaskName-10m", lockAtMostFor = "10m")

Buckets are auto created with fixed TTL, but never deleted. So any timing changes will require manual deletion of the bucket. NatsJetStreamLockProvider will trigger log warning if this mismatch is detected.

NATS 2.11 should make it possible to define TTL on key level when released... so fingers crossed :D

Import the project

```xml
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jetstream</artifactId>
<version>5.16.0</version>
</dependency>
```

Configure:

```java
import net.javacrumbs.shedlock.provider.nats.jetstream.NatsJetStreamLockProvider;

...

@Bean
public NatsJetStreamLockProvider lockProvider(io.nats.client.Connection connection) {
return new NatsJetStreamLockProvider(connection);
}
```

## Multi-tenancy
If you have multi-tenancy use-case you can use a lock provider similar to this one
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<module>providers/datastore/shedlock-provider-datastore</module>
<module>providers/spanner/shedlock-provider-spanner</module>
<module>providers/neo4j/shedlock-provider-neo4j</module>
<module>providers/jetstream/shedlock-provider-jetstream</module>
</modules>

<properties>
Expand Down
78 changes: 78 additions & 0 deletions providers/jetstream/shedlock-provider-jetstream/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shedlock-parent</artifactId>
<groupId>net.javacrumbs.shedlock</groupId>
<version>5.16.1-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shedlock-provider-jetstream</artifactId>
<version>5.16.1-SNAPSHOT</version>

<properties>
<nats.version>2.20.2</nats.version>
</properties>

<dependencies>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>${nats.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.ver}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
net.javacrumbs.shedlock.provider.nats.jetstream
</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import net.javacrumbs.shedlock.core.AbstractSimpleLock;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.annotation.NonNull;

public final class NatsJetStreamLock extends AbstractSimpleLock {

private final NatsJetStreamLockProvider natsJetStreamLockProvider;

protected NatsJetStreamLock(
@NonNull LockConfiguration lockConfiguration,
@NonNull NatsJetStreamLockProvider natsJetStreamLockProvider) {
super(lockConfiguration);
this.natsJetStreamLockProvider = natsJetStreamLockProvider;
}

@Override
protected void doUnlock() {
natsJetStreamLockProvider.unlock(lockConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import static net.javacrumbs.shedlock.core.ClockProvider.now;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.api.KeyValueConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.LockException;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Lock Provider for NATS JetStream
*
* @see <a href="https://docs.nats.io/nats-concepts/jetstream">JetStream</a>
*/
public class NatsJetStreamLockProvider implements LockProvider, AutoCloseable {

private final Logger log = LoggerFactory.getLogger(NatsJetStreamLockProvider.class);

private final ScheduledExecutorService unlockScheduler = Executors.newSingleThreadScheduledExecutor();

private final Connection connection;

/**
* Create NatsJetStreamLockProvider
*
* @param connection
* io.nats.client.Connection
*/
public NatsJetStreamLockProvider(@NonNull Connection connection) {
this.connection = connection;
}

@Override
@NonNull
public Optional<SimpleLock> lock(@NonNull LockConfiguration lockConfiguration) {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Attempting lock for bucketName: {}", bucketName);
try {
var lockTime = lockConfiguration.getLockAtMostFor();

// nats cannot accept below 100ms
if (lockTime.toMillis() < 100L) {
log.debug(
"NATS must be above 100ms for smallest locktime, correcting {}ms to 100ms!",
lockTime.toMillis());
lockTime = Duration.ofMillis(100L);
}

connection
.keyValueManagement()
.create(KeyValueConfiguration.builder()
.name(bucketName)
.ttl(lockTime)
.build());
connection.keyValue(bucketName).create("LOCKED", "ShedLock internal value. Do not touch.".getBytes());

log.debug("Acquired lock for bucketName: {}", bucketName);

return Optional.of(new NatsJetStreamLock(lockConfiguration, this));
} catch (JetStreamApiException e) {
if (e.getApiErrorCode() == 10071) {
log.debug("Rejected lock for bucketName: {}, message: {}", bucketName, e.getMessage());
return Optional.empty();
} else if (e.getApiErrorCode() == 10058) {
log.warn(
"Settings on the bucket TTL does not match configuration. Manually delete the bucket on NATS server, or revert lock settings!");
return Optional.empty();
}
log.warn("Rejected lock for bucketName: {}", bucketName);
throw new IllegalStateException(e);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

void unlock(LockConfiguration lockConfiguration) {
var bucketName = String.format("SHEDLOCK-%s", lockConfiguration.getName());
log.debug("Unlocking for bucketName: {}", bucketName);
var additionalSessionTtl = Duration.between(now(), lockConfiguration.getLockAtLeastUntil());
if (!additionalSessionTtl.isNegative() && !additionalSessionTtl.isZero()) {
log.debug("Lock will still be held for {}", additionalSessionTtl);
scheduleUnlock(bucketName, additionalSessionTtl);
} else {
destroy(bucketName);
}
}

private void scheduleUnlock(String bucketName, Duration unlockTime) {
unlockScheduler.schedule(
catchExceptions(() -> destroy(bucketName)), unlockTime.toMillis(), TimeUnit.MILLISECONDS);
}

private void destroy(String bucketName) {
log.debug("Destroying key in bucketName: {}", bucketName);
try {
connection.keyValue(bucketName).delete("LOCKED");
} catch (Exception e) {
throw new LockException("Can not remove key. " + e.getMessage());
}
}

private Runnable catchExceptions(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Throwable t) {
log.warn("Exception while execution", t);
}
};
}

@Override
public void close() {
unlockScheduler.shutdown();
try {
if (!unlockScheduler.awaitTermination(Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS)) {
unlockScheduler.shutdownNow();
}
} catch (InterruptedException ignored) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.javacrumbs.shedlock.provider.nats.jetstream;

import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.DockerImageName;

public class NatsJetStreamContainer extends GenericContainer<NatsJetStreamContainer> {

private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamContainer.class);

public static final DockerImageName NATS_IMAGE = DockerImageName.parse("nats:2.10-alpine");

private static final Integer NATS_PORT = 4222;
private static final Integer NATS_HTTP_PORT = 8222;

public NatsJetStreamContainer() {
super(NATS_IMAGE.asCanonicalNameString());
this.withExposedPorts(NATS_PORT, NATS_HTTP_PORT)
.withNetworkAliases("nats")
.withLogConsumer(frame -> LOGGER.info(frame.getUtf8String().replace("\n", "")))
.withCommand("--jetstream", "--http_port", NATS_HTTP_PORT.toString())
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*"))
.withStartupTimeout(Duration.ofSeconds(180L));
}
}
Loading
Loading