From ef22430ec9a185dded4712d1c40b860e8f81839f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Jul 2024 12:50:40 +0800 Subject: [PATCH] Check if value is expired periodically --- .../transaction/buffer/impl/TableView.java | 3 +- .../org/apache/pulsar/utils/SimpleCache.java | 67 +++++++++++++------ .../apache/pulsar/utils/SimpleCacheTest.java | 4 +- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java index 79836d8c15fc4f..1f95e2d6225f3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java @@ -43,6 +43,7 @@ public class TableView { // Remove the cached reader and snapshots if there is no refresh request in 1 minute private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L; + private static final long CACHE_EXPIRE_CHECK_FREQUENCY = 3000L; @VisibleForTesting protected final Function>> readerCreator; private final Map snapshots = new ConcurrentHashMap<>(); @@ -53,7 +54,7 @@ public TableView(Function>> readerCreator ScheduledExecutorService executor) { this.readerCreator = readerCreator; this.clientOperationTimeoutMs = clientOperationTimeoutMs; - this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS); + this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS, CACHE_EXPIRE_CHECK_FREQUENCY); } public T readLatest(String topic) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java index 41055b37c012f7..3f1c2a15a297fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java @@ -19,6 +19,7 @@ package org.apache.pulsar.utils; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -27,36 +28,58 @@ import java.util.function.Supplier; import lombok.RequiredArgsConstructor; -@RequiredArgsConstructor public class SimpleCache { - private final Map cache = new HashMap<>(); + private final Map> cache = new HashMap<>(); private final Map> futures = new HashMap<>(); - private final ScheduledExecutorService executor; private final long timeoutMs; - public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) { - final V value; - V existingValue = cache.get(key); - if (existingValue != null) { - value = existingValue; - } else { - value = valueSupplier.get(); - cache.put(key, value); + @RequiredArgsConstructor + private class ExpirableValue { + + private final V value; + private final Consumer expireCallback; + private long deadlineMs; + + boolean tryExpire() { + if (System.currentTimeMillis() >= deadlineMs) { + expireCallback.accept(value); + return true; + } else { + return false; + } } - final var future = futures.remove(key); - if (future != null) { - future.cancel(true); + + void updateDeadline() { + deadlineMs = System.currentTimeMillis() + timeoutMs; } - futures.put(key, executor.schedule(() -> { + } + + public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) { + this.timeoutMs = timeoutMs; + scheduler.scheduleAtFixedRate(() -> { synchronized (SimpleCache.this) { - futures.remove(key); - final var removedValue = cache.remove(key); - if (removedValue != null) { - expireCallback.accept(removedValue); - } + final var keys = new HashSet(); + cache.forEach((key, value) -> { + if (value.tryExpire()) { + keys.add(key); + } + }); + keys.forEach(cache::remove); } - }, timeoutMs, TimeUnit.MILLISECONDS)); - return value; + }, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS); + } + + public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) { + final var value = cache.get(key); + if (value != null) { + value.updateDeadline(); + return value.value; + } + + final var newValue = new ExpirableValue<>(valueSupplier.get(), expireCallback); + newValue.updateDeadline(); + cache.put(key, newValue); + return newValue.value; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java index c2176d71fec9ff..02bdbacb15af06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java @@ -38,7 +38,7 @@ public void shutdown() { @Test public void testConcurrentUpdate() throws Exception { - final var cache = new SimpleCache(executor, 10000L); + final var cache = new SimpleCache(executor, 10000L, 10000L); final var pool = Executors.newFixedThreadPool(2); final var latch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { @@ -60,7 +60,7 @@ public void testConcurrentUpdate() throws Exception { @Test public void testExpire() throws InterruptedException { - final var cache = new SimpleCache(executor, 500L); + final var cache = new SimpleCache(executor, 500L, 5); final var expiredValues = new CopyOnWriteArrayList(); cache.get(0, () -> 100, expiredValues::add); for (int i = 0; i < 100; i++) {