-
Notifications
You must be signed in to change notification settings - Fork 26
Support refresh of a cache value #92
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please talk with me before doing any changes, to decide how to proceed with this pull request.
} | ||
} | ||
|
||
private class InternalCacheLoader extends com.google.common.cache.CacheLoader<K, ComposableFuture<V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check if this can be a static class
public ListenableFuture<ComposableFuture<V>> reload(final K key, final ComposableFuture<V> oldValue) { | ||
ListenableFutureTask<ComposableFuture<V>> task = ListenableFutureTask.create(() -> { | ||
ComposableFuture<V> loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); | ||
V value = loadFuture.get(loadTimeout, loadTimeoutUnit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking contradicts the idea of that class, let's discuss how we can avoid that.
/** | ||
* A wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in the configured interval on access | ||
*/ | ||
public class RefreshLoadingCacheDelegate<K, V> implements TypedCache<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should think about the design here, because it feels like mostly a copy of LoadingCacheDelegate
public ListenableFuture<ComposableFuture<V>> reload(final K key, final ComposableFuture<V> oldValue) { | ||
ListenableFutureTask<ComposableFuture<V>> task = ListenableFutureTask.create(() -> { | ||
ComposableFuture<V> loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); | ||
V value = loadFuture.get(loadTimeout, loadTimeoutUnit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can implement reload this way:
final SettableFuture<ComposableFuture<V>> future = SettableFuture.create();
loader.load(cacheName, key).materialize().andThen(t -> {
if (t.isSuccess()) {
future.set(ComposableFutures.fromValue(t.getValue()));
} else {
future.setException(t.getError());
}
});
return future;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just perfect! thanks!
|
||
@Override | ||
public ComposableFuture<V> load(@Nonnull final K key) { | ||
return loader.load(cacheName, key).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you add withTimeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added timeout because LoadingCacheDelegate has the option to set timeout to the load and i felt that it was missing here.
Don't you agree?
return loadElements(Lists.newArrayList(keys)); | ||
} | ||
}); | ||
this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add diamond: new InternalCacheLoader<>
private final Counter refreshErrors; | ||
private final Counter refreshTimeouts; | ||
|
||
public RefreshLoadingCacheDelegate(final TypedCache<K, ValueWithWriteTime<V>> cache, final CacheLoader<K, V> loader, final String cacheName, final MetricFactory metricFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider making private / package private
public ComposableFuture<V> getAsync(final K key) { | ||
ComposableFuture<ValueWithWriteTime<V>> futureValue = cache.getAsync(key); | ||
futureValue.consume(value -> { | ||
if (value.isSuccess() && shouldRefresh(value.getValue(), System.currentTimeMillis())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a time supplier instead of System.currentTimeMillis() and avoid Thread.sleep in tests.
} | ||
|
||
private boolean shouldRefresh(final ValueWithWriteTime<V> value, long currentTimeMillis) { | ||
return value != null && currentTimeMillis - value.getWriteTime() >= refreshAfterWriteDuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider randomizing the TTL
return extractValues(resultMap); | ||
} | ||
|
||
private List<K> collectKeysToRefresh(final Map<K, ValueWithWriteTime<V>> result, final long currentTimeMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can return a Stream here
return cache.deleteAsync(key); | ||
} | ||
|
||
private class InternalCacheLoader implements CacheLoader<K, ValueWithWriteTime<V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be static
} | ||
} | ||
|
||
private class InternalEntityMapper implements EntryMapper<K, ValueWithWriteTime<V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be static
if (alreadyRefreshing == null) { | ||
incRefreshCount(); | ||
internalCacheLoader.load(cacheName, key) | ||
.withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch from loader; cache name: " + cacheName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use withTimeout? it's in LoadingCacheDelegate only for historical reasons.
.collect(Collectors.toList()); | ||
|
||
if (!keysToRefresh.isEmpty()) { | ||
incRefreshCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inc by the number of keys?
internalCacheLoader.load(cacheName, keysToRefresh) | ||
.withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch bulk from loader; cache name: " + cacheName) | ||
.consume(res -> { | ||
keysToRefresh.forEach(refreshingKeys::remove); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively: refreshingKeys.keySet().removeAll(keysToRefresh);
a wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in a configured time
LocalAsyncCache - Support refresh of a cache value
Introduce new RefreshLoadingCacheDelegate - a wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in a configured time