From 69a039bf6f47dc3f7b07af17ca5d14ca6220b6f4 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Wed, 22 May 2024 16:23:06 +0800 Subject: [PATCH] mcache --- .../main/java/org/rx/core/CachePolicy.java | 33 ++++++++++----- .../org/rx/core/cache/CaffeineExpiry.java | 42 ++++++++++++++----- .../java/org/rx/core/cache/MemoryCache.java | 22 ++++++---- rxlib/src/test/java/org/rx/core/TestCore.java | 15 +++++-- 4 files changed, 80 insertions(+), 32 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/CachePolicy.java b/rxlib/src/main/java/org/rx/core/CachePolicy.java index 708c1941..73127e89 100644 --- a/rxlib/src/main/java/org/rx/core/CachePolicy.java +++ b/rxlib/src/main/java/org/rx/core/CachePolicy.java @@ -6,7 +6,6 @@ import java.io.Serializable; -@AllArgsConstructor public class CachePolicy implements Serializable { private static final long serialVersionUID = 4378825072232415879L; @@ -28,16 +27,26 @@ public static CachePolicy sliding(int expireSeconds) { return new CachePolicy(DateTime.now().addSeconds(expireSeconds).getTime(), expireSeconds * 1000); } + final int slidingSpan; @Getter - long expiration = Long.MAX_VALUE; - int slidingSpan; + long expiration; public boolean isExpired() { - return expiration <= System.currentTimeMillis(); + return expiration < System.currentTimeMillis(); + } + + public boolean isSliding() { + return slidingSpan > 0; + } + + public CachePolicy(long expiration, int slidingSpan) { + this.expiration = expiration; + this.slidingSpan = slidingSpan; } protected CachePolicy(CachePolicy policy) { if (policy == null) { + this.slidingSpan = 0; return; } this.expiration = policy.expiration; @@ -45,20 +54,24 @@ protected CachePolicy(CachePolicy policy) { } public long ttl() { - return ttl(slidingSpan > 0); + return ttl(isSliding()); } public long ttl(boolean slidingRenew) { - long ttl = Math.max(0, expiration - System.currentTimeMillis()); - if (ttl > 0 && slidingRenew) { - Tasks.setTimeout(this::slidingRenew, 100, this, Constants.TIMER_REPLACE_FLAG); -// slidingRenew(); + long curTime = System.currentTimeMillis(); + long ttl = Math.max(0, expiration - curTime); + if (slidingRenew && ttl > 0) { + Tasks.setTimeout(() -> { +// System.out.println("slidingRenew"); + expiration = curTime + slidingSpan; + }, 100, this, Constants.TIMER_REPLACE_FLAG); + return slidingSpan; } return ttl; } public boolean slidingRenew() { - if (slidingSpan <= 0) { + if (!isSliding()) { return false; } diff --git a/rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java b/rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java index 07f8bd16..5e7639c8 100644 --- a/rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java +++ b/rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java @@ -2,41 +2,61 @@ import com.github.benmanes.caffeine.cache.Expiry; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.nullness.qual.NonNull; import org.rx.core.CachePolicy; +import org.rx.core.IOC; import org.rx.core.RxConfig; +import org.rx.core.Sys; +import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.rx.core.Extends.as; + +//@Slf4j @RequiredArgsConstructor class CaffeineExpiry implements Expiry { static final long DEFAULT_SLIDING_NANOS = TimeUnit.SECONDS.toNanos(RxConfig.INSTANCE.getCache().getSlidingSeconds()); + final Map policyMap; - static long computeNanos(Object value, long currentDuration) { - CachePolicy policy; - if (value instanceof CachePolicy) { - policy = (CachePolicy) value; - return TimeUnit.MILLISECONDS.toNanos(policy.ttl()); + long computeNanos(Object key, Object value, long currentDuration) { + long ttlNanos; + CachePolicy policy = policyMap.get(key); + if (policy == null) { + policy = as(value, CachePolicy.class); } - //absolute + if (policy != null) { +// log.debug("computeNanos key={} policy={} currentDuration={}", key, policy, currentDuration); + //absolute 或 sliding 在policy.ttl()内部已处理 + ttlNanos = policy.isSliding() || currentDuration == -1 ? TimeUnit.MILLISECONDS.toNanos(policy.ttl()) : currentDuration; + } else { + //absolute // return currentDuration != -1 ? currentDuration : DEFAULT_SLIDING_NANOS; - //sliding - return DEFAULT_SLIDING_NANOS; + //sliding + ttlNanos = DEFAULT_SLIDING_NANOS; + } +// log.debug("computeNanos key={} result={}", key, ttlNanos); + return ttlNanos; } + //currentTime = System.nanoTime() @Override public long expireAfterCreate(@NonNull Object key, @NonNull Object value, long currentTime) { - return computeNanos(value, -1); +// System.out.println("expireAfterCreate[-1]: " + key + "=" + value); + return computeNanos(key, value, -1); } @Override public long expireAfterUpdate(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) { - return computeNanos(value, currentDuration); +// System.out.println("expireAfterUpdate[" + currentDuration + "]: " + key + "=" + value); + return computeNanos(key, value, currentDuration); } @Override public long expireAfterRead(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) { - return computeNanos(value, currentDuration); +// System.out.println("expireAfterRead[" + currentDuration + "]: " + key + "=" + value); + return computeNanos(key, value, currentDuration); } } diff --git a/rxlib/src/main/java/org/rx/core/cache/MemoryCache.java b/rxlib/src/main/java/org/rx/core/cache/MemoryCache.java index 1d4ebde9..afdcf927 100644 --- a/rxlib/src/main/java/org/rx/core/cache/MemoryCache.java +++ b/rxlib/src/main/java/org/rx/core/cache/MemoryCache.java @@ -1,15 +1,17 @@ package org.rx.core.cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Policy; -import com.github.benmanes.caffeine.cache.Scheduler; +import com.github.benmanes.caffeine.cache.*; import lombok.SneakyThrows; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.rx.core.*; +import org.rx.core.Cache; import org.rx.util.function.BiAction; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import static org.rx.core.Extends.require; @@ -34,6 +36,7 @@ static Caffeine rootBuilder() { } final com.github.benmanes.caffeine.cache.Cache cache; + final Map policyMap = new ConcurrentHashMap<>(); final Policy.VarExpiration expireVariably; public MemoryCache() { @@ -42,11 +45,15 @@ public MemoryCache() { @SneakyThrows public MemoryCache(BiAction> onBuild) { - Caffeine builder = rootBuilder().expireAfter(new CaffeineExpiry()); + Caffeine builder = rootBuilder(); if (onBuild != null) { onBuild.invoke(builder); } - cache = builder.build(); + cache = builder.expireAfter(new CaffeineExpiry(policyMap)) + .removalListener((key, value, cause) -> { +// System.out.println("policyMap remove " + key); + policyMap.remove(key); + }).build(); expireVariably = cache.policy().expireVariably().get(); } @@ -76,11 +83,10 @@ public TV get(Object key) { @Override public TV put(TK key, TV value, CachePolicy policy) { - TV oldValue = cache.asMap().put(key, value); if (policy != null) { - setExpire(key, TimeUnit.NANOSECONDS.toMillis(CaffeineExpiry.computeNanos(value, -1))); + policyMap.put(key, policy); } - return oldValue; + return cache.asMap().put(key, value); } @Override diff --git a/rxlib/src/test/java/org/rx/core/TestCore.java b/rxlib/src/test/java/org/rx/core/TestCore.java index bab9272f..cdb334d1 100644 --- a/rxlib/src/test/java/org/rx/core/TestCore.java +++ b/rxlib/src/test/java/org/rx/core/TestCore.java @@ -15,6 +15,7 @@ import org.rx.bean.*; import org.rx.codec.RSAUtil; import org.rx.core.cache.DiskCache; +import org.rx.core.cache.MemoryCache; import org.rx.exception.ApplicationException; import org.rx.exception.InvalidException; import org.rx.exception.TraceHandler; @@ -631,11 +632,19 @@ static void onEventWithTopic(Integer obj) { public void cache() { System.out.println(cacheKey("prefix", "login", 12345)); -// BiAction> dump = b -> b.removalListener((k, v, c) -> log.info("onRemoval {} {} {}", k, v, c)); + Cache cache = Cache.getInstance(MemoryCache.class); + cache.put("abc", 1, CachePolicy.absolute(5)); + cache.put("bbc", 2, CachePolicy.sliding(2)); +// cache.put("bbc", 2); + AtomicInteger c = new AtomicInteger(); + Tasks.timer.setTimeout(() -> { + log.info("abc: {}, bbc: {}", cache.get("abc"), cache.get("bbc")); + }, d -> c.incrementAndGet() > 5 ? 3000 : 1000, null, Constants.TIMER_PERIOD_FLAG); + // testCache(new MemoryCache<>(dump)); - DiskCache, Integer> diskCache = (DiskCache) Cache.getInstance(DiskCache.class); - testCache(diskCache); +// DiskCache, Integer> diskCache = (DiskCache) Cache.getInstance(DiskCache.class); +// testCache(diskCache); System.in.read(); }