Skip to content

Commit

Permalink
[fix][ml] Fix thread safe issue with RangeCache.put and RangeCache.cl…
Browse files Browse the repository at this point in the history
…ear (apache#21302)
  • Loading branch information
lhotari authored Oct 7, 2023
1 parent 643428b commit 70d086f
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;

/**
Expand Down Expand Up @@ -74,13 +73,18 @@ public RangeCache(Weighter<Value> weighter, TimestampExtractor<Value> timestampE
* @return whether the entry was inserted in the cache
*/
public boolean put(Key key, Value value) {
MutableBoolean flag = new MutableBoolean();
entries.computeIfAbsent(key, (k) -> {
size.addAndGet(weighter.getSize(value));
flag.setValue(true);
return value;
});
return flag.booleanValue();
// retain value so that it's not released before we put it in the cache and calculate the weight
value.retain();
try {
if (entries.putIfAbsent(key, value) == null) {
size.addAndGet(weighter.getSize(value));
return true;
} else {
return false;
}
} finally {
value.release();
}
}

public boolean exists(Key key) {
Expand Down Expand Up @@ -242,7 +246,6 @@ public synchronized Pair<Integer, Long> clear() {
value.release();
}

entries.clear();
size.getAndAdd(-removedSize);
return Pair.of(removedCount, removedSize);
}
Expand Down

0 comments on commit 70d086f

Please sign in to comment.