Skip to content

Commit

Permalink
Implement a download manager per peer set for bitswap
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Oct 20, 2023
1 parent 6d638c4 commit 9ccfa59
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
36 changes: 36 additions & 0 deletions src/main/java/org/peergos/protocol/bitswap/Bitswap.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.*;
Expand All @@ -22,6 +23,7 @@ public class Bitswap extends StrictProtocolBinding<BitswapController> implements

private final BitswapEngine engine;
private final LRUCache<PeerId, Boolean> connected = new LRUCache<>(100);
private final LRUCache<Set<PeerId>, DownloadManager> downloads = new LRUCache<>(100);
private AddressBook addrs;

public Bitswap(BitswapEngine engine) {
Expand Down Expand Up @@ -64,6 +66,8 @@ public List<CompletableFuture<HashedBlock>> get(List<Want> wants,
results.add(res);
}
sendWants(us, peers);
DownloadManager manager = downloads.getOrDefault(peers, new DownloadManager(us, peers));
manager.ensureRunning();
return results;
}

Expand All @@ -73,8 +77,40 @@ public Set<PeerId> getBroadcastAudience() {
return res;
}

private class DownloadManager {
private final Host us;
private final Set<PeerId> peers;
private final AtomicBoolean running = new AtomicBoolean(false);

public DownloadManager(Host us, Set<PeerId> peers) {
this.us = us;
this.peers = peers;
}

public void ensureRunning() {
if (! running.get())
new Thread(() -> run()).start();
}

public void run() {
running.set(true);
while (true) {
try {Thread.sleep(5_000);} catch (InterruptedException e) {}
Set<Want> wants = engine.getWants(peers);
if (wants.isEmpty())
break;
sendWants(us, wants, peers);
}
running.set(false);
}
}

public void sendWants(Host us, Set<PeerId> peers) {
Set<Want> wants = engine.getWants(peers);
sendWants(us, wants, peers);
}

public void sendWants(Host us, Set<Want> wants, Set<PeerId> peers) {
Map<Want, PeerId> haves = engine.getHaves();
// broadcast to all connected bitswap peers if none are supplied
Set<PeerId> audience = peers.isEmpty() ? getBroadcastAudience() : peers;
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class BitswapEngine {
private final ConcurrentHashMap<Want, PeerId> blockHaves = new ConcurrentHashMap<>();
private final Map<Want, Boolean> deniedWants = Collections.synchronizedMap(new LRUCache<>(10_000));
private final Map<PeerId, Map<Want, Boolean>> recentBlocksSent = Collections.synchronizedMap(new LRUCache<>(100));
private final Map<PeerId, Map<Want, Boolean>> recentWantsSent = Collections.synchronizedMap(new org.peergos.util.LRUCache<>(100));
private final Map<PeerId, Map<Want, Long>> recentWantsSent = Collections.synchronizedMap(new org.peergos.util.LRUCache<>(100));
private final Set<PeerId> connections = new HashSet<>();
private final BlockRequestAuthoriser authoriser;
private AddressBook addressBook;
Expand Down Expand Up @@ -69,8 +69,8 @@ public Set<PeerId> getConnected() {
return connected;
}

private Map<Want, Boolean> recentSentWants(PeerId peer) {
Map<Want, Boolean> recent = recentWantsSent.get(peer);
private Map<Want, Long> recentSentWants(PeerId peer) {
Map<Want, Long> recent = recentWantsSent.get(peer);
if (recent == null) {
recent = Collections.synchronizedMap(new LRUCache<>(1000));
recentWantsSent.put(peer, recent);
Expand All @@ -81,12 +81,14 @@ private Map<Want, Boolean> recentSentWants(PeerId peer) {
public Set<Want> getWants(Set<PeerId> peers) {
if (peers.size() == 1) {
PeerId peer = peers.stream().findFirst().get();
Map<Want, Boolean> recent = recentSentWants(peer);
Map<Want, Long> recent = recentSentWants(peer);

long now = System.currentTimeMillis();
long minResendWait = 5_000;
Set<Want> res = localWants.keySet().stream()
.filter(w -> !recent.containsKey(w))
.filter(w -> !recent.containsKey(w) || recent.get(w) < now - minResendWait)
.collect(Collectors.toSet());
res.forEach(w -> recent.put(w, true));
res.forEach(w -> recent.put(w, now));
return res;
}
return localWants.keySet();
Expand Down

0 comments on commit 9ccfa59

Please sign in to comment.