From 9ccfa59891edc00fa24eefafaa4bab6025cf34e5 Mon Sep 17 00:00:00 2001 From: ian Date: Fri, 20 Oct 2023 09:56:00 +0100 Subject: [PATCH] Implement a download manager per peer set for bitswap --- .../org/peergos/protocol/bitswap/Bitswap.java | 36 +++++++++++++++++++ .../protocol/bitswap/BitswapEngine.java | 14 ++++---- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/peergos/protocol/bitswap/Bitswap.java b/src/main/java/org/peergos/protocol/bitswap/Bitswap.java index 15dfc762..39fcf5de 100644 --- a/src/main/java/org/peergos/protocol/bitswap/Bitswap.java +++ b/src/main/java/org/peergos/protocol/bitswap/Bitswap.java @@ -12,6 +12,7 @@ import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.logging.*; import java.util.stream.*; @@ -22,6 +23,7 @@ public class Bitswap extends StrictProtocolBinding implements private final BitswapEngine engine; private final LRUCache connected = new LRUCache<>(100); + private final LRUCache, DownloadManager> downloads = new LRUCache<>(100); private AddressBook addrs; public Bitswap(BitswapEngine engine) { @@ -64,6 +66,8 @@ public List> get(List wants, results.add(res); } sendWants(us, peers); + DownloadManager manager = downloads.getOrDefault(peers, new DownloadManager(us, peers)); + manager.ensureRunning(); return results; } @@ -73,8 +77,40 @@ public Set getBroadcastAudience() { return res; } + private class DownloadManager { + private final Host us; + private final Set peers; + private final AtomicBoolean running = new AtomicBoolean(false); + + public DownloadManager(Host us, Set peers) { + this.us = us; + this.peers = peers; + } + + public void ensureRunning() { + if (! running.get()) + new Thread(() -> run()).start(); + } + + public void run() { + running.set(true); + while (true) { + try {Thread.sleep(5_000);} catch (InterruptedException e) {} + Set wants = engine.getWants(peers); + if (wants.isEmpty()) + break; + sendWants(us, wants, peers); + } + running.set(false); + } + } + public void sendWants(Host us, Set peers) { Set wants = engine.getWants(peers); + sendWants(us, wants, peers); + } + + public void sendWants(Host us, Set wants, Set peers) { Map haves = engine.getHaves(); // broadcast to all connected bitswap peers if none are supplied Set audience = peers.isEmpty() ? getBroadcastAudience() : peers; diff --git a/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java b/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java index f37d43db..1c430e28 100644 --- a/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java +++ b/src/main/java/org/peergos/protocol/bitswap/BitswapEngine.java @@ -28,7 +28,7 @@ public class BitswapEngine { private final ConcurrentHashMap blockHaves = new ConcurrentHashMap<>(); private final Map deniedWants = Collections.synchronizedMap(new LRUCache<>(10_000)); private final Map> recentBlocksSent = Collections.synchronizedMap(new LRUCache<>(100)); - private final Map> recentWantsSent = Collections.synchronizedMap(new org.peergos.util.LRUCache<>(100)); + private final Map> recentWantsSent = Collections.synchronizedMap(new org.peergos.util.LRUCache<>(100)); private final Set connections = new HashSet<>(); private final BlockRequestAuthoriser authoriser; private AddressBook addressBook; @@ -69,8 +69,8 @@ public Set getConnected() { return connected; } - private Map recentSentWants(PeerId peer) { - Map recent = recentWantsSent.get(peer); + private Map recentSentWants(PeerId peer) { + Map recent = recentWantsSent.get(peer); if (recent == null) { recent = Collections.synchronizedMap(new LRUCache<>(1000)); recentWantsSent.put(peer, recent); @@ -81,12 +81,14 @@ private Map recentSentWants(PeerId peer) { public Set getWants(Set peers) { if (peers.size() == 1) { PeerId peer = peers.stream().findFirst().get(); - Map recent = recentSentWants(peer); + Map recent = recentSentWants(peer); + long now = System.currentTimeMillis(); + long minResendWait = 5_000; Set res = localWants.keySet().stream() - .filter(w -> !recent.containsKey(w)) + .filter(w -> !recent.containsKey(w) || recent.get(w) < now - minResendWait) .collect(Collectors.toSet()); - res.forEach(w -> recent.put(w, true)); + res.forEach(w -> recent.put(w, now)); return res; } return localWants.keySet();