Skip to content

Commit

Permalink
Merge branch 'master' into rxsocks
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed May 30, 2024
2 parents 26c4223 + 1e7b980 commit 1b172b2
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.rockylomo</groupId>
<artifactId>rx</artifactId>
<version>2.20.2-SNAPSHOT</version>
<version>2.20.3-SNAPSHOT</version>
<packaging>pom</packaging>

<parent>
Expand Down
2 changes: 1 addition & 1 deletion rxlib-x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.github.rockylomo</groupId>
<artifactId>rx</artifactId>
<version>2.20.2-SNAPSHOT</version>
<version>2.20.3-SNAPSHOT</version>
</parent>
<artifactId>rxlib-x</artifactId>
<packaging>jar</packaging>
Expand Down
6 changes: 4 additions & 2 deletions rxlib-x/src/main/java/org/rx/crawler/BrowserType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
@Getter
public enum BrowserType {
CHROME("chrome.exe", "chromedriver.exe"),
FIRE_FOX("firefox.exe", ""),
@Deprecated
IE("iexplore.exe", "IEDriverServer.exe");

private final String processName;
private final String driverName;
final String processName;
final String driverName;
}
12 changes: 6 additions & 6 deletions rxlib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.github.rockylomo</groupId>
<artifactId>rx</artifactId>
<version>2.20.2-SNAPSHOT</version>
<version>2.20.3-SNAPSHOT</version>
</parent>
<!-- -SNAPSHOT -->
<artifactId>rxlib</artifactId>
Expand All @@ -15,17 +15,17 @@
<properties>
<slf4j-api.version>2.0.4</slf4j-api.version>
<logback.version>1.3.12</logback.version>
<fastjson2.version>2.0.48</fastjson2.version>
<fastjson2.version>2.0.50</fastjson2.version>
<caffeine.version>2.9.3</caffeine.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-io.version>2.13.0</commons-io.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<byte-buddy.version>1.14.16</byte-buddy.version>
<zip4j.version>2.11.3</zip4j.version>
<h2.version>2.2.224</h2.version>
<netty.version>4.1.106.Final</netty.version>
<bcprov.version>1.77</bcprov.version>
<okhttp.version>4.11.0</okhttp.version>
<netty.version>4.1.110.Final</netty.version>
<bcprov.version>1.78.1</bcprov.version>
<okhttp.version>4.12.0</okhttp.version>
<sshd.version>2.12.1</sshd.version>
<jsch.version>0.1.55</jsch.version>

Expand Down
82 changes: 82 additions & 0 deletions rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.rx.bean;

import lombok.Getter;
import lombok.Setter;
import org.rx.core.*;
import org.rx.util.function.TripleFunc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import static org.rx.core.Extends.ifNull;

public class CircularBlockingQueue<T> extends LinkedBlockingQueue<T> implements EventPublisher<CircularBlockingQueue<T>> {
private static final long serialVersionUID = 4685018531330571106L;
public final Delegate<CircularBlockingQueue<T>, NEventArgs<T>> onConsume = Delegate.create();
public TripleFunc<CircularBlockingQueue<T>, T, Boolean> onFull;
final ReentrantLock pLock = Reflects.readField(this, "putLock");
TimeoutFuture<?> consumeTimer;
@Getter
long consumePeriod;

public synchronized void setConsumePeriod(long consumePeriod) {
if ((this.consumePeriod = consumePeriod) > 0) {
if (consumeTimer != null) {
consumeTimer.cancel();
}
consumeTimer = Tasks.timer().setTimeout(() -> {
T t;
NEventArgs<T> e = new NEventArgs<>();
while ((t = poll()) != null) {
e.setValue(t);
raiseEvent(onConsume, e);
}
}, d -> consumePeriod, null, Constants.TIMER_PERIOD_FLAG);
} else {
if (consumeTimer != null) {
consumeTimer.cancel();
}
}
}

public CircularBlockingQueue(int capacity) {
this(capacity, null);
onFull = (q, t) -> {
pLock.lock();
try {
boolean ok;
do {
q.poll();
ok = q.innerOffer(t);
}
while (!ok);
return true;
} finally {
pLock.unlock();
}
};
}

public CircularBlockingQueue(int capacity, TripleFunc<CircularBlockingQueue<T>, T, Boolean> onFull) {
super(capacity);
this.onFull = onFull;
}

// @Override
// public boolean add(T t) {
// return offer(t);
// }

@Override
public boolean offer(T t) {
boolean r = super.offer(t);
if (!r && onFull != null) {
return ifNull(onFull.apply(this, t), false);
}
return r;
}

protected boolean innerOffer(T t) {
return super.offer(t);
}
}
33 changes: 23 additions & 10 deletions rxlib/src/main/java/org/rx/core/CachePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.io.Serializable;

@AllArgsConstructor
public class CachePolicy implements Serializable {
private static final long serialVersionUID = 4378825072232415879L;

Expand All @@ -28,37 +27,51 @@ public static CachePolicy sliding(int expireSeconds) {
return new CachePolicy(DateTime.now().addSeconds(expireSeconds).getTime(), expireSeconds * 1000);
}

final int slidingSpan;
@Getter
long expiration = Long.MAX_VALUE;
int slidingSpan;
long expiration;

public boolean isExpired() {
return expiration <= System.currentTimeMillis();
return expiration < System.currentTimeMillis();
}

public boolean isSliding() {
return slidingSpan > 0;
}

public CachePolicy(long expiration, int slidingSpan) {
this.expiration = expiration;
this.slidingSpan = slidingSpan;
}

protected CachePolicy(CachePolicy policy) {
if (policy == null) {
this.slidingSpan = 0;
return;
}
this.expiration = policy.expiration;
this.slidingSpan = policy.slidingSpan;
}

public long ttl() {
return ttl(slidingSpan > 0);
return ttl(isSliding());
}

public long ttl(boolean slidingRenew) {
long ttl = Math.max(0, expiration - System.currentTimeMillis());
if (ttl > 0 && slidingRenew) {
Tasks.setTimeout(this::slidingRenew, 100, this, Constants.TIMER_REPLACE_FLAG);
// slidingRenew();
long curTime = System.currentTimeMillis();
long ttl = Math.max(0, expiration - curTime);
if (slidingRenew && ttl > 0) {
Tasks.setTimeout(() -> {
// System.out.println("slidingRenew");
expiration = curTime + slidingSpan;
}, 100, this, Constants.TIMER_REPLACE_FLAG);
return slidingSpan;
}
return ttl;
}

public boolean slidingRenew() {
if (slidingSpan <= 0) {
if (!isSliding()) {
return false;
}

Expand Down
20 changes: 20 additions & 0 deletions rxlib/src/main/java/org/rx/core/Sys.java
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,26 @@ static Object visitJson(Object cur, String path, AtomicInteger i, char c, String
return cur;
}

static final String TYPED_JSON_KEY = "_rx$type";

public static <T> T fromTypedJsonObject(@NonNull Map<String, Object> tJson) {
String td = (String) tJson.get(TYPED_JSON_KEY);
if (td == null) {
throw new InvalidException("Invalid json type");
}
return fromJson(tJson, Reflects.fromTypeDescriptor(td));
}

public static <T> JSONObject toTypedJsonObject(@NonNull T obj) {
return toTypedJsonObject(obj, obj.getClass());
}

public static <T> JSONObject toTypedJsonObject(@NonNull T obj, @NonNull Type objType) {
JSONObject r = toJsonObject(obj);
r.put(TYPED_JSON_KEY, Reflects.getTypeDescriptor(objType));
return r;
}

//TypeReference
public static <T> T fromJson(Object src, Type type) {
// if (src == null) {
Expand Down
42 changes: 31 additions & 11 deletions rxlib/src/main/java/org/rx/core/cache/CaffeineExpiry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,61 @@

import com.github.benmanes.caffeine.cache.Expiry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.rx.core.CachePolicy;
import org.rx.core.IOC;
import org.rx.core.RxConfig;
import org.rx.core.Sys;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.rx.core.Extends.as;

//@Slf4j
@RequiredArgsConstructor
class CaffeineExpiry implements Expiry<Object, Object> {
static final long DEFAULT_SLIDING_NANOS = TimeUnit.SECONDS.toNanos(RxConfig.INSTANCE.getCache().getSlidingSeconds());
final Map<Object, CachePolicy> policyMap;

static long computeNanos(Object value, long currentDuration) {
CachePolicy policy;
if (value instanceof CachePolicy) {
policy = (CachePolicy) value;
return TimeUnit.MILLISECONDS.toNanos(policy.ttl());
long computeNanos(Object key, Object value, long currentDuration) {
long ttlNanos;
CachePolicy policy = policyMap.get(key);
if (policy == null) {
policy = as(value, CachePolicy.class);
}
//absolute
if (policy != null) {
// log.debug("computeNanos key={} policy={} currentDuration={}", key, policy, currentDuration);
//absolute 或 sliding 在policy.ttl()内部已处理
ttlNanos = policy.isSliding() || currentDuration == -1 ? TimeUnit.MILLISECONDS.toNanos(policy.ttl()) : currentDuration;
} else {
//absolute
// return currentDuration != -1 ? currentDuration : DEFAULT_SLIDING_NANOS;
//sliding
return DEFAULT_SLIDING_NANOS;
//sliding
ttlNanos = DEFAULT_SLIDING_NANOS;
}
// log.debug("computeNanos key={} result={}", key, ttlNanos);
return ttlNanos;
}

//currentTime = System.nanoTime()
@Override
public long expireAfterCreate(@NonNull Object key, @NonNull Object value, long currentTime) {
return computeNanos(value, -1);
// System.out.println("expireAfterCreate[-1]: " + key + "=" + value);
return computeNanos(key, value, -1);
}

@Override
public long expireAfterUpdate(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) {
return computeNanos(value, currentDuration);
// System.out.println("expireAfterUpdate[" + currentDuration + "]: " + key + "=" + value);
return computeNanos(key, value, currentDuration);
}

@Override
public long expireAfterRead(@NonNull Object key, @NonNull Object value, long currentTime, @NonNegative long currentDuration) {
return computeNanos(value, currentDuration);
// System.out.println("expireAfterRead[" + currentDuration + "]: " + key + "=" + value);
return computeNanos(key, value, currentDuration);
}
}
22 changes: 14 additions & 8 deletions rxlib/src/main/java/org/rx/core/cache/MemoryCache.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package org.rx.core.cache;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.*;
import lombok.SneakyThrows;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.rx.core.*;
import org.rx.core.Cache;
import org.rx.util.function.BiAction;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.rx.core.Extends.require;
Expand All @@ -34,6 +36,7 @@ static Caffeine<Object, Object> rootBuilder() {
}

final com.github.benmanes.caffeine.cache.Cache<TK, TV> cache;
final Map<Object, CachePolicy> policyMap = new ConcurrentHashMap<>();
final Policy.VarExpiration<TK, TV> expireVariably;

public MemoryCache() {
Expand All @@ -42,11 +45,15 @@ public MemoryCache() {

@SneakyThrows
public MemoryCache(BiAction<Caffeine<Object, Object>> onBuild) {
Caffeine<Object, Object> builder = rootBuilder().expireAfter(new CaffeineExpiry());
Caffeine<Object, Object> builder = rootBuilder();
if (onBuild != null) {
onBuild.invoke(builder);
}
cache = builder.build();
cache = builder.expireAfter(new CaffeineExpiry(policyMap))
.removalListener((key, value, cause) -> {
// System.out.println("policyMap remove " + key);
policyMap.remove(key);
}).build();
expireVariably = cache.policy().expireVariably().get();
}

Expand Down Expand Up @@ -76,11 +83,10 @@ public TV get(Object key) {

@Override
public TV put(TK key, TV value, CachePolicy policy) {
TV oldValue = cache.asMap().put(key, value);
if (policy != null) {
setExpire(key, TimeUnit.NANOSECONDS.toMillis(CaffeineExpiry.computeNanos(value, -1)));
policyMap.put(key, policy);
}
return oldValue;
return cache.asMap().put(key, value);
}

@Override
Expand Down
Loading

0 comments on commit 1b172b2

Please sign in to comment.