From 90a708b6711d88ca179026a6ef0c578c49d96c8d Mon Sep 17 00:00:00 2001 From: ian Date: Tue, 12 Dec 2023 10:50:50 +0000 Subject: [PATCH] Generalise kademlia.closerPeers to arbitrary keys Make sure we iclude /ipns/ in key wqhen publishing a value Use GET_VALUE calls only when looking up a value (using returned closer peers to iterate) Most IPNS lookups are now < 1s --- src/main/java/org/peergos/EmbeddedIpfs.java | 7 +- .../org/peergos/protocol/dht/Kademlia.java | 139 +++++++++++------- .../protocol/dht/KademliaController.java | 4 +- src/test/java/org/peergos/FindPeerTest.java | 2 +- 4 files changed, 95 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java index ec446661..0af8a9ec 100644 --- a/src/main/java/org/peergos/EmbeddedIpfs.java +++ b/src/main/java/org/peergos/EmbeddedIpfs.java @@ -23,6 +23,7 @@ import org.peergos.protocol.circuit.*; import org.peergos.protocol.dht.*; import org.peergos.protocol.http.*; +import org.peergos.protocol.ipns.*; import org.peergos.util.Logging; import java.nio.file.*; @@ -110,9 +111,9 @@ public CompletableFuture publishValue(PrivKey priv, byte[] value, long seq public CompletableFuture resolveValue(PubKey pub) { Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes()); - CompletableFuture res = new CompletableFuture<>(); - dht.resolveValue(publisher, node, Kademlia.getNRecords(2, res)); - return res; + List candidates = dht.resolveValue(publisher, 1, node); + List records = candidates.stream().sorted().collect(Collectors.toList()); + return CompletableFuture.completedFuture(records.get(records.size() - 1).value); } public void start() { diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index dd392c6a..422a24d6 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -136,16 +136,21 @@ private int compareKeys(RoutingEntry a, RoutingEntry b, Id keyId) { private final ExecutorService ioExec = Executors.newFixedThreadPool(16); public List findClosestPeers(Multihash peerIdkey, int maxCount, Host us) { + if (maxCount == 1) { + Collection existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join(); + if (!existing.isEmpty()) + return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing))); + } byte[] key = peerIdkey.toBytes(); + return findClosestPeers(key, maxCount, us); + } + public List findClosestPeers(byte[] key, int maxCount, Host us) { Id keyId = Id.create(Hash.sha256(key), 256); SortedSet closest = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); SortedSet toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); List localClosest = engine.getKClosestPeers(key); if (maxCount == 1) { - Collection existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join(); - if (! existing.isEmpty()) - return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing))); - Optional match = localClosest.stream().filter(p -> p.peerId.equals(peerIdkey)).findFirst(); + Optional match = localClosest.stream().filter(p -> Arrays.equals(p.peerId.toBytes(), key)).findFirst(); if (match.isPresent()) return Collections.singletonList(match.get()); } @@ -163,7 +168,7 @@ public List findClosestPeers(Multihash peerIdkey, int maxCount, H .map(r -> { toQuery.remove(r); queried.add(r.addresses.peerId); - return ioExec.submit(() -> getCloserPeers(peerIdkey, r.addresses, us).join()); + return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).join()); }) .collect(Collectors.toList()); boolean foundCloser = false; @@ -173,7 +178,7 @@ public List findClosestPeers(Multihash peerIdkey, int maxCount, H for (PeerAddresses peer : result) { if (!queried.contains(peer.peerId)) { // exit early if we are looking for the specific node - if (maxCount == 1 && peer.peerId.equals(peerIdkey)) + if (maxCount == 1 && Arrays.equals(peer.peerId.toBytes(), key)) return Collections.singletonList(peer); queried.add(peer.peerId); Id peerKey = Id.create(Hash.sha256(peer.peerId.toBytes()), 256); @@ -253,17 +258,17 @@ public CompletableFuture> findProviders(Multihash block, Hos return CompletableFuture.completedFuture(providers); } - private CompletableFuture> getCloserPeers(Multihash peerIDKey, PeerAddresses target, Host us) { + private CompletableFuture> getCloserPeers(byte[] key, PeerAddresses target, Host us) { try { - return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(peerIDKey); + return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key); } catch (Exception e) { // we can't dial quic only nodes until it's implemented if (target.addresses.stream().allMatch(a -> a.toString().contains("quic"))) return CompletableFuture.completedFuture(Collections.emptyList()); if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) { - LOG.fine("Couldn't dial " + peerIDKey + " addrs: " + target.addresses); + LOG.fine("Couldn't dial " + target.peerId + " addrs: " + target.addresses); } else if (e.getCause() instanceof TimeoutException) - LOG.fine("Timeout dialing " + peerIDKey + " addrs: " + target.addresses); + LOG.fine("Timeout dialing " + target.peerId + " addrs: " + target.addresses); else if (e.getCause() instanceof ConnectionClosedException) {} else e.printStackTrace(); @@ -321,56 +326,88 @@ public CompletableFuture publishValue(PrivKey priv, Host us) { Set publishes = Collections.synchronizedSet(new HashSet<>()); for (int i=0; i < 5 && publishes.size() < 20; i++) { - List closestPeers = findClosestPeers(publisher, 25, us); - closestPeers.forEach(peer -> ioExec.submit(() -> { + List closestPeers = findClosestPeers(IPNS.getKey(publisher), 25, us); + List> futures = closestPeers.stream() + .filter(p -> !publishes.contains(p.peerId)) + .map(peer -> ioExec.submit(() -> { + try { + dialPeer(peer, us).join() + .putValue(publishValue, expiry, sequence, + ttlNanos, publisher, priv).thenAccept(success -> { + if (success) + publishes.add(peer.peerId); + }); + } catch (Exception e) {} + })).collect(Collectors.toList()); + futures.forEach(f -> { try { - dialPeer(peer, us).join() - .putValue(publishValue, expiry, sequence, - ttlNanos, publisher, priv).thenAccept(success -> { - if (success) - publishes.add(peer.peerId); - }); + f.get(); } catch (Exception e) {} - })); + }); } return CompletableFuture.completedFuture(null); } - public static Predicate getNRecords(int minResults, CompletableFuture res) { - List candidates = new ArrayList<>(); - return rec -> { - candidates.add(rec); - if (candidates.size() >= minResults) { - // Validate and sort records by sequence number - List records = candidates.stream().sorted().collect(Collectors.toList()); - res.complete(records.get(records.size() - 1).value); - return false; - } - return true; - }; - } - public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, int minResults) { - CompletableFuture res = new CompletableFuture<>(); - resolveValue(publisher, us, getNRecords(minResults, res)); - return res.thenApply(String::new); + List candidates = resolveValue(publisher, minResults, us); + List records = candidates.stream().sorted().collect(Collectors.toList()); + if (records.isEmpty()) + return CompletableFuture.failedFuture(new IllegalStateException("Couldn't find IPNS value for " + publisher)); + return CompletableFuture.completedFuture(new String(records.get(records.size() - 1).value)); } - public void resolveValue(Multihash publisher, Host us, Predicate getMore) { - List closestPeers = findClosestPeers(publisher, 20, us); - Set queryCandidates = new HashSet<>(); - Set queriedPeers = new HashSet<>(); - for (PeerAddresses peer : closestPeers) { - if (queriedPeers.contains(peer.peerId)) - continue; - queriedPeers.add(peer.peerId); - try { - GetResult res = dialPeer(peer, us).join().getValue(publisher).join(); - if (res.record.isPresent() && res.record.get().publisher.equals(publisher)) - if ( !getMore.test(res.record.get().value)) - return; - queryCandidates.addAll(res.closerPeers); - } catch (Exception e) {} + private Optional getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { + try { + return Optional.of(dialPeer(peer, us).join().getValue(publisher).join()); + } catch (Exception e) {} + return Optional.empty(); + } + public List resolveValue(Multihash publisher, int minResults, Host us) { + byte[] key = IPNS.getKey(publisher); + List candidates = new ArrayList<>(); + Id keyId = Id.create(Hash.sha256(key), 256); + SortedSet toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); + List localClosest = engine.getKClosestPeers(key); + int queryParallelism = 3; + toQuery.addAll(localClosest.stream() + .limit(queryParallelism) + .map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p)) + .collect(Collectors.toList())); + Set queried = Collections.synchronizedSet(new HashSet<>()); + while (! toQuery.isEmpty()) { + int remaining = toQuery.size() - 3; + List thisRound = toQuery.stream() + .limit(queryParallelism) + .collect(Collectors.toList()); + List> futures = thisRound.stream() + .map(r -> { + toQuery.remove(r); + queried.add(r.addresses.peerId); + return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us) + .ifPresent(g -> { + if (g.record.isPresent() && g.record.get().publisher.equals(publisher)) + candidates.add(g.record.get().value); + for (PeerAddresses peer : g.closerPeers) { + if (! queried.contains(peer.peerId)) { + Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); + RoutingEntry e = new RoutingEntry(peerKey, peer); + toQuery.add(e); + } + } + })); + }) + .collect(Collectors.toList()); + futures.forEach(f -> { + try { + f.get(); + } catch (Exception e) {} + }); + // exit early if we have enough results + if (candidates.size() >= minResults) + break; + if (toQuery.size() == remaining) + break; } + return candidates; } } diff --git a/src/main/java/org/peergos/protocol/dht/KademliaController.java b/src/main/java/org/peergos/protocol/dht/KademliaController.java index cbff1672..e5f6aa99 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaController.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaController.java @@ -20,10 +20,10 @@ public interface KademliaController { CompletableFuture send(Dht.Message msg); - default CompletableFuture> closerPeers(Multihash peerID) { + default CompletableFuture> closerPeers(byte[] key) { return rpc(Dht.Message.newBuilder() .setType(Dht.Message.MessageType.FIND_NODE) - .setKey(ByteString.copyFrom(peerID.toBytes())) + .setKey(ByteString.copyFrom(key)) .build()) .thenApply(resp -> resp.getCloserPeersList().stream() .map(PeerAddresses::fromProtobuf) diff --git a/src/test/java/org/peergos/FindPeerTest.java b/src/test/java/org/peergos/FindPeerTest.java index 91c1610a..51173c18 100644 --- a/src/test/java/org/peergos/FindPeerTest.java +++ b/src/test/java/org/peergos/FindPeerTest.java @@ -49,7 +49,7 @@ private static long findAndDialPeer(Multihash toFind, Kademlia dht1, Host node1) PeerAddresses peer = matching.get(); Multiaddr[] addrs = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new); dht1.dial(node1, PeerId.fromBase58(peer.peerId.toBase58()), addrs) - .getController().join().closerPeers(toFind).join(); + .getController().join().closerPeers(toFind.toBytes()).join(); System.out.println("Peer lookup took " + (t2-t1) + "ms"); return t2 - t1; }