Skip to content

Commit

Permalink
Check if value is expired periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 26, 2024
1 parent d24b2be commit ef22430
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TableView<T> {

// Remove the cached reader and snapshots if there is no refresh request in 1 minute
private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
private static final long CACHE_EXPIRE_CHECK_FREQUENCY = 3000L;
@VisibleForTesting
protected final Function<TopicName, CompletableFuture<Reader<T>>> readerCreator;
private final Map<String, T> snapshots = new ConcurrentHashMap<>();
Expand All @@ -53,7 +54,7 @@ public TableView(Function<TopicName, CompletableFuture<Reader<T>>> readerCreator
ScheduledExecutorService executor) {
this.readerCreator = readerCreator;
this.clientOperationTimeoutMs = clientOperationTimeoutMs;
this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS, CACHE_EXPIRE_CHECK_FREQUENCY);
}

public T readLatest(String topic) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.utils;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -27,36 +28,58 @@
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SimpleCache<K, V> {

private final Map<K, V> cache = new HashMap<>();
private final Map<K, ExpirableValue<V>> cache = new HashMap<>();
private final Map<K, ScheduledFuture<?>> futures = new HashMap<>();
private final ScheduledExecutorService executor;
private final long timeoutMs;

public synchronized V get(final K key, final Supplier<V> valueSupplier, final Consumer<V> expireCallback) {
final V value;
V existingValue = cache.get(key);
if (existingValue != null) {
value = existingValue;
} else {
value = valueSupplier.get();
cache.put(key, value);
@RequiredArgsConstructor
private class ExpirableValue<V> {

private final V value;
private final Consumer<V> expireCallback;
private long deadlineMs;

boolean tryExpire() {
if (System.currentTimeMillis() >= deadlineMs) {
expireCallback.accept(value);
return true;
} else {
return false;
}
}
final var future = futures.remove(key);
if (future != null) {
future.cancel(true);

void updateDeadline() {
deadlineMs = System.currentTimeMillis() + timeoutMs;
}
futures.put(key, executor.schedule(() -> {
}

public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) {
this.timeoutMs = timeoutMs;
scheduler.scheduleAtFixedRate(() -> {
synchronized (SimpleCache.this) {
futures.remove(key);
final var removedValue = cache.remove(key);
if (removedValue != null) {
expireCallback.accept(removedValue);
}
final var keys = new HashSet<K>();
cache.forEach((key, value) -> {
if (value.tryExpire()) {
keys.add(key);
}
});
keys.forEach(cache::remove);
}
}, timeoutMs, TimeUnit.MILLISECONDS));
return value;
}, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
}

public synchronized V get(final K key, final Supplier<V> valueSupplier, final Consumer<V> expireCallback) {
final var value = cache.get(key);
if (value != null) {
value.updateDeadline();
return value.value;
}

final var newValue = new ExpirableValue<>(valueSupplier.get(), expireCallback);
newValue.updateDeadline();
cache.put(key, newValue);
return newValue.value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void shutdown() {

@Test
public void testConcurrentUpdate() throws Exception {
final var cache = new SimpleCache<Integer, Integer>(executor, 10000L);
final var cache = new SimpleCache<Integer, Integer>(executor, 10000L, 10000L);
final var pool = Executors.newFixedThreadPool(2);
final var latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
Expand All @@ -60,7 +60,7 @@ public void testConcurrentUpdate() throws Exception {

@Test
public void testExpire() throws InterruptedException {
final var cache = new SimpleCache<Integer, Integer>(executor, 500L);
final var cache = new SimpleCache<Integer, Integer>(executor, 500L, 5);
final var expiredValues = new CopyOnWriteArrayList<Integer>();
cache.get(0, () -> 100, expiredValues::add);
for (int i = 0; i < 100; i++) {
Expand Down

0 comments on commit ef22430

Please sign in to comment.