Skip to content

Commit

Permalink
Use Guava cache for ExpirationSet to simplify and optimise implementa…
Browse files Browse the repository at this point in the history
…tion. (#168)

Optimise stats collection in KBucket to avoid creating new lists of all nodes.
  • Loading branch information
ajsutton authored Nov 4, 2022
1 parent e6e1e29 commit 9ef672b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

package org.ethereum.beacon.discovery.database;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import kotlin.Pair;
import java.util.concurrent.TimeUnit;

/**
* Set-alike collection with data expiring in configured time. This structure is not thread safe,
Expand All @@ -21,96 +22,53 @@
*/
public class ExpirationSet<V extends Comparable<V>> {

private final long expirationDelayMillis;
private final Clock clock;
private final long size;
private final TreeSet<Pair<Long, V>> dataExpiration =
new TreeSet<>(
Comparator.comparingLong(Pair<Long, V>::getFirst)
.reversed()
.thenComparing(Pair::getSecond));
private final Set<V> data = new HashSet<>();
private final Set<V> data;

/**
* Creates new set with records expiring in configured timeline after insertion
*
* @param expirationDelayMillis Expiration delay, each record will be removed after this time
* @param clock Clock instance
* @param size Maximum size of a set, the oldest records will be removed to store new
* @param maxSize Maximum size of a set, the oldest records will be removed to store new
*/
public ExpirationSet(final long expirationDelayMillis, final Clock clock, final long size) {
this.expirationDelayMillis = expirationDelayMillis;
this.clock = clock;
if (size < 1) {
throw new RuntimeException("Minimal size is 1");
}
this.size = size;
public ExpirationSet(final long expirationDelayMillis, final Clock clock, final long maxSize) {
checkArgument(maxSize > 0, "Minimal size is 1");
data =
Collections.newSetFromMap(
CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(expirationDelayMillis, TimeUnit.MILLISECONDS)
.ticker(
new Ticker() {
@Override
public long read() {
return TimeUnit.MILLISECONDS.toNanos(clock.millis());
}
})
.<V, Boolean>build()
.asMap());
}

/**
* Creates new set with records expiring in configured timeline after insertion
*
* @param expirationDelay Expiration delay, each record will be removed after this time
* @param clock Clock instance
* @param size Maximum size of a set, the oldest records will be removed to store new
* @param maxSize Maximum size of a set, the oldest records will be removed to store new
*/
public ExpirationSet(final Duration expirationDelay, final Clock clock, final long size) {
this(expirationDelay.toMillis(), clock, size);
}

private void clearExpired() {
final long currentTime = clock.millis();
boolean next = true;
while (next && !dataExpiration.isEmpty()) {
Pair<Long, V> last = dataExpiration.last();
if (last.getFirst() < currentTime) {
dataExpiration.remove(last);
data.remove(last.getSecond());
} else {
next = false;
}
}
}

public int size() {
clearExpired();
return data.size();
}

public boolean isEmpty() {
clearExpired();
return data.isEmpty();
public ExpirationSet(final Duration expirationDelay, final Clock clock, final long maxSize) {
this(expirationDelay.toMillis(), clock, maxSize);
}

public boolean contains(V o) {
clearExpired();
return data.contains(o);
}

public boolean add(V v) {
clearExpired();
// no renewal
if (contains(v)) {
if (data.contains(v)) {
// Ensure re-adding doesn't reset expiration.
return false;
}
while (data.size() >= size) {
Pair<Long, V> last = dataExpiration.last();
dataExpiration.remove(last);
data.remove(last.getSecond());
}
dataExpiration.add(new Pair<>(clock.millis() + expirationDelayMillis, v));
data.add(v);
return true;
}

public boolean addAll(Collection<? extends V> c) {
clearExpired();
c.forEach(this::add);
return true;
}

public void clear() {
data.clear();
dataExpiration.clear();
return data.add(v);
}
}
14 changes: 10 additions & 4 deletions src/main/java/org/ethereum/beacon/discovery/storage/KBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.liveness.LivenessChecker;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand Down Expand Up @@ -41,15 +42,20 @@ public KBucket(final LivenessChecker livenessChecker, final Clock clock) {
this.clock = clock;
}

public void updateStats(final int distance, final BucketStats stats) {
stats.setBucketStat(distance, (int) streamLiveEntries().count(), nodes.size());
}

public List<NodeRecord> getAllNodes() {
return nodes.stream().map(BucketEntry::getNode).collect(Collectors.toList());
}

public List<NodeRecord> getLiveNodes() {
return nodes.stream()
.takeWhile(BucketEntry::isLive)
.map(BucketEntry::getNode)
.collect(Collectors.toList());
return streamLiveEntries().map(BucketEntry::getNode).collect(Collectors.toList());
}

private Stream<BucketEntry> streamLiveEntries() {
return nodes.stream().takeWhile(BucketEntry::isLive);
}

public Optional<NodeRecord> getPendingNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ public synchronized void offer(NodeRecord node) {

public synchronized BucketStats getStats() {
final BucketStats stats = new BucketStats();
buckets.forEach(
(distance, bucket) ->
stats.setBucketStat(
distance, bucket.getLiveNodes().size(), bucket.getAllNodes().size()));
buckets.forEach((distance, bucket) -> bucket.updateStats(distance, stats));
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,52 @@ public class ExpirationSetTest {
private final Clock clock = mock(Clock.class);

@Test
public void shouldAppreciateSize() {
when(clock.millis()).thenReturn(12345L);
public void shouldNotExceedMaximumSize() {
when(clock.millis()).thenReturn(45L);
ExpirationSet<String> expirationSet = new ExpirationSet<>(1, clock, 3);
expirationSet.add("a");
expirationSet.add("b");
expirationSet.add("c");
expirationSet.add("d");
assertThat(expirationSet.size()).isEqualTo(3);
assertThat(expirationSet.contains("a")).isFalse();
assertThat(expirationSet.contains("b")).isTrue();
assertThat(expirationSet.contains("c")).isTrue();
assertThat(expirationSet.contains("d")).isTrue();
}

@Test
public void oldElementsShouldExpire() {
when(clock.millis()).thenReturn(12345L);
public void shouldExpireOldElements() {
when(clock.millis()).thenReturn(45L);
ExpirationSet<String> expirationSet = new ExpirationSet<>(5, clock, 3);
expirationSet.add("a");
expirationSet.add("b");
assertThat(expirationSet.size()).isEqualTo(2);
when(clock.millis()).thenReturn(12352L);
assertThat(expirationSet.contains("a")).isTrue();
assertThat(expirationSet.contains("b")).isTrue();
assertThat(expirationSet.contains("c")).isFalse();
assertThat(expirationSet.contains("d")).isFalse();
when(clock.millis()).thenReturn(52L);
expirationSet.add("c");
assertThat(expirationSet.size()).isEqualTo(1);
when(clock.millis()).thenReturn(12355L);
assertThat(expirationSet.contains("a")).isFalse();
assertThat(expirationSet.contains("b")).isFalse();
assertThat(expirationSet.contains("c")).isTrue();
assertThat(expirationSet.contains("d")).isFalse();
when(clock.millis()).thenReturn(55L);
expirationSet.add("d");
assertThat(expirationSet.size()).isEqualTo(2);
assertThat(expirationSet.contains("a")).isFalse();
assertThat(expirationSet.contains("b")).isFalse();
assertThat(expirationSet.contains("c")).isTrue();
assertThat(expirationSet.contains("d")).isTrue();
}

@Test
public void addingExistingElementShouldBeIgnored() {
when(clock.millis()).thenReturn(12345L);
public void shouldIgnoreReaddedElements() {
when(clock.millis()).thenReturn(45L);
ExpirationSet<String> expirationSet = new ExpirationSet<>(5, clock, 3);
expirationSet.add("a");
when(clock.millis()).thenReturn(12349L);
when(clock.millis()).thenReturn(49L);
expirationSet.add("a");
assertThat(expirationSet.size()).isEqualTo(1);
when(clock.millis()).thenReturn(12351L);
assertThat(expirationSet.size()).isEqualTo(0);
assertThat(expirationSet.contains("a")).isTrue();
when(clock.millis()).thenReturn(51L);
assertThat(expirationSet.contains("a")).isFalse();
}
}

0 comments on commit 9ef672b

Please sign in to comment.