Skip to content

Commit

Permalink
mcache
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed May 22, 2024
1 parent 0f3e0e1 commit 69a039b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 32 deletions.
33 changes: 23 additions & 10 deletions rxlib/src/main/java/org/rx/core/CachePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.io.Serializable;

@AllArgsConstructor
public class CachePolicy implements Serializable {
private static final long serialVersionUID = 4378825072232415879L;

Expand All @@ -28,37 +27,51 @@ public static CachePolicy sliding(int expireSeconds) {
return new CachePolicy(DateTime.now().addSeconds(expireSeconds).getTime(), expireSeconds * 1000);
}

final int slidingSpan;
@Getter
long expiration = Long.MAX_VALUE;
int slidingSpan;
long expiration;

public boolean isExpired() {
return expiration <= System.currentTimeMillis();
return expiration < System.currentTimeMillis();
}

public boolean isSliding() {
return slidingSpan > 0;
}

public CachePolicy(long expiration, int slidingSpan) {
this.expiration = expiration;
this.slidingSpan = slidingSpan;
}

protected CachePolicy(CachePolicy policy) {
if (policy == null) {
this.slidingSpan = 0;
return;
}
this.expiration = policy.expiration;
this.slidingSpan = policy.slidingSpan;
}

public long ttl() {
return ttl(slidingSpan > 0);
return ttl(isSliding());
}

public long ttl(boolean slidingRenew) {
long ttl = Math.max(0, expiration - System.currentTimeMillis());
if (ttl > 0 && slidingRenew) {
Tasks.setTimeout(this::slidingRenew, 100, this, Constants.TIMER_REPLACE_FLAG);
// slidingRenew();
long curTime = System.currentTimeMillis();
long ttl = Math.max(0, expiration - curTime);
if (slidingRenew && ttl > 0) {
Tasks.setTimeout(() -> {
// System.out.println("slidingRenew");
expiration = curTime + slidingSpan;
}, 100, this, Constants.TIMER_REPLACE_FLAG);
return slidingSpan;
}
return ttl;
}

public boolean slidingRenew() {
if (slidingSpan <= 0) {
if (!isSliding()) {
return false;
}

Expand Down
42 changes: 31 additions & 11 deletions rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,61 @@

import com.github.benmanes.caffeine.cache.Expiry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.rx.core.CachePolicy;
import org.rx.core.IOC;
import org.rx.core.RxConfig;
import org.rx.core.Sys;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.rx.core.Extends.as;

//@Slf4j
@RequiredArgsConstructor
class CaffeineExpiry implements Expiry<Object, Object> {
static final long DEFAULT_SLIDING_NANOS = TimeUnit.SECONDS.toNanos(RxConfig.INSTANCE.getCache().getSlidingSeconds());
final Map<Object, CachePolicy> policyMap;

static long computeNanos(Object value, long currentDuration) {
CachePolicy policy;
if (value instanceof CachePolicy) {
policy = (CachePolicy) value;
return TimeUnit.MILLISECONDS.toNanos(policy.ttl());
long computeNanos(Object key, Object value, long currentDuration) {
long ttlNanos;
CachePolicy policy = policyMap.get(key);
if (policy == null) {
policy = as(value, CachePolicy.class);
}
//absolute
if (policy != null) {
// log.debug("computeNanos key={} policy={} currentDuration={}", key, policy, currentDuration);
//absolute 或 sliding 在policy.ttl()内部已处理
ttlNanos = policy.isSliding() || currentDuration == -1 ? TimeUnit.MILLISECONDS.toNanos(policy.ttl()) : currentDuration;
} else {
//absolute
// return currentDuration != -1 ? currentDuration : DEFAULT_SLIDING_NANOS;
//sliding
return DEFAULT_SLIDING_NANOS;
//sliding
ttlNanos = DEFAULT_SLIDING_NANOS;
}
// log.debug("computeNanos key={} result={}", key, ttlNanos);
return ttlNanos;
}

//currentTime = System.nanoTime()
@Override
public long expireAfterCreate(@NonNull Object key, @NonNull Object value, long currentTime) {
return computeNanos(value, -1);
// System.out.println("expireAfterCreate[-1]: " + key + "=" + value);
return computeNanos(key, value, -1);
}

@Override
public long expireAfterUpdate(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) {
return computeNanos(value, currentDuration);
// System.out.println("expireAfterUpdate[" + currentDuration + "]: " + key + "=" + value);
return computeNanos(key, value, currentDuration);
}

@Override
public long expireAfterRead(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) {
return computeNanos(value, currentDuration);
// System.out.println("expireAfterRead[" + currentDuration + "]: " + key + "=" + value);
return computeNanos(key, value, currentDuration);
}
}
22 changes: 14 additions & 8 deletions rxlib/src/main/java/org/rx/core/cache/MemoryCache.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package org.rx.core.cache;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.*;
import lombok.SneakyThrows;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.rx.core.*;
import org.rx.core.Cache;
import org.rx.util.function.BiAction;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.rx.core.Extends.require;
Expand All @@ -34,6 +36,7 @@ static Caffeine<Object, Object> rootBuilder() {
}

final com.github.benmanes.caffeine.cache.Cache<TK, TV> cache;
final Map<Object, CachePolicy> policyMap = new ConcurrentHashMap<>();
final Policy.VarExpiration<TK, TV> expireVariably;

public MemoryCache() {
Expand All @@ -42,11 +45,15 @@ public MemoryCache() {

@SneakyThrows
public MemoryCache(BiAction<Caffeine<Object, Object>> onBuild) {
Caffeine<Object, Object> builder = rootBuilder().expireAfter(new CaffeineExpiry());
Caffeine<Object, Object> builder = rootBuilder();
if (onBuild != null) {
onBuild.invoke(builder);
}
cache = builder.build();
cache = builder.expireAfter(new CaffeineExpiry(policyMap))
.removalListener((key, value, cause) -> {
// System.out.println("policyMap remove " + key);
policyMap.remove(key);
}).build();
expireVariably = cache.policy().expireVariably().get();
}

Expand Down Expand Up @@ -76,11 +83,10 @@ public TV get(Object key) {

@Override
public TV put(TK key, TV value, CachePolicy policy) {
TV oldValue = cache.asMap().put(key, value);
if (policy != null) {
setExpire(key, TimeUnit.NANOSECONDS.toMillis(CaffeineExpiry.computeNanos(value, -1)));
policyMap.put(key, policy);
}
return oldValue;
return cache.asMap().put(key, value);
}

@Override
Expand Down
15 changes: 12 additions & 3 deletions rxlib/src/test/java/org/rx/core/TestCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.rx.bean.*;
import org.rx.codec.RSAUtil;
import org.rx.core.cache.DiskCache;
import org.rx.core.cache.MemoryCache;
import org.rx.exception.ApplicationException;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
Expand Down Expand Up @@ -631,11 +632,19 @@ static void onEventWithTopic(Integer obj) {
public void cache() {
System.out.println(cacheKey("prefix", "login", 12345));

// BiAction<Caffeine<Object, Object>> dump = b -> b.removalListener((k, v, c) -> log.info("onRemoval {} {} {}", k, v, c));
Cache<Object, Object> cache = Cache.getInstance(MemoryCache.class);
cache.put("abc", 1, CachePolicy.absolute(5));
cache.put("bbc", 2, CachePolicy.sliding(2));
// cache.put("bbc", 2);
AtomicInteger c = new AtomicInteger();
Tasks.timer.setTimeout(() -> {
log.info("abc: {}, bbc: {}", cache.get("abc"), cache.get("bbc"));
}, d -> c.incrementAndGet() > 5 ? 3000 : 1000, null, Constants.TIMER_PERIOD_FLAG);

// testCache(new MemoryCache<>(dump));

DiskCache<Tuple<?, ?>, Integer> diskCache = (DiskCache) Cache.getInstance(DiskCache.class);
testCache(diskCache);
// DiskCache<Tuple<?, ?>, Integer> diskCache = (DiskCache) Cache.getInstance(DiskCache.class);
// testCache(diskCache);

System.in.read();
}
Expand Down

0 comments on commit 69a039b

Please sign in to comment.