Skip to content

Commit

Permalink
Generalise kademlia.closerPeers to arbitrary keys
Browse files Browse the repository at this point in the history
Make sure we iclude /ipns/ in key wqhen publishing a value

Use GET_VALUE calls only when looking up a value
(using returned closer peers to iterate)

Most IPNS lookups are now < 1s
  • Loading branch information
ianopolous committed Dec 12, 2023
1 parent f30a124 commit 90a708b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 57 deletions.
7 changes: 4 additions & 3 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.peergos.protocol.circuit.*;
import org.peergos.protocol.dht.*;
import org.peergos.protocol.http.*;
import org.peergos.protocol.ipns.*;
import org.peergos.util.Logging;

import java.nio.file.*;
Expand Down Expand Up @@ -110,9 +111,9 @@ public CompletableFuture<Void> publishValue(PrivKey priv, byte[] value, long seq

public CompletableFuture<byte[]> resolveValue(PubKey pub) {
Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes());
CompletableFuture<byte[]> res = new CompletableFuture<>();
dht.resolveValue(publisher, node, Kademlia.getNRecords(2, res));
return res;
List<IpnsRecord> candidates = dht.resolveValue(publisher, 1, node);
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
return CompletableFuture.completedFuture(records.get(records.size() - 1).value);
}

public void start() {
Expand Down
139 changes: 88 additions & 51 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ private int compareKeys(RoutingEntry a, RoutingEntry b, Id keyId) {
private final ExecutorService ioExec = Executors.newFixedThreadPool(16);

public List<PeerAddresses> findClosestPeers(Multihash peerIdkey, int maxCount, Host us) {
if (maxCount == 1) {
Collection<Multiaddr> existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join();
if (!existing.isEmpty())
return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing)));
}
byte[] key = peerIdkey.toBytes();
return findClosestPeers(key, maxCount, us);
}
public List<PeerAddresses> findClosestPeers(byte[] key, int maxCount, Host us) {
Id keyId = Id.create(Hash.sha256(key), 256);
SortedSet<RoutingEntry> closest = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId)));
SortedSet<RoutingEntry> toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId)));
List<PeerAddresses> localClosest = engine.getKClosestPeers(key);
if (maxCount == 1) {
Collection<Multiaddr> existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join();
if (! existing.isEmpty())
return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing)));
Optional<PeerAddresses> match = localClosest.stream().filter(p -> p.peerId.equals(peerIdkey)).findFirst();
Optional<PeerAddresses> match = localClosest.stream().filter(p -> Arrays.equals(p.peerId.toBytes(), key)).findFirst();
if (match.isPresent())
return Collections.singletonList(match.get());
}
Expand All @@ -163,7 +168,7 @@ public List<PeerAddresses> findClosestPeers(Multihash peerIdkey, int maxCount, H
.map(r -> {
toQuery.remove(r);
queried.add(r.addresses.peerId);
return ioExec.submit(() -> getCloserPeers(peerIdkey, r.addresses, us).join());
return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).join());
})
.collect(Collectors.toList());
boolean foundCloser = false;
Expand All @@ -173,7 +178,7 @@ public List<PeerAddresses> findClosestPeers(Multihash peerIdkey, int maxCount, H
for (PeerAddresses peer : result) {
if (!queried.contains(peer.peerId)) {
// exit early if we are looking for the specific node
if (maxCount == 1 && peer.peerId.equals(peerIdkey))
if (maxCount == 1 && Arrays.equals(peer.peerId.toBytes(), key))
return Collections.singletonList(peer);
queried.add(peer.peerId);
Id peerKey = Id.create(Hash.sha256(peer.peerId.toBytes()), 256);
Expand Down Expand Up @@ -253,17 +258,17 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
return CompletableFuture.completedFuture(providers);
}

private CompletableFuture<List<PeerAddresses>> getCloserPeers(Multihash peerIDKey, PeerAddresses target, Host us) {
private CompletableFuture<List<PeerAddresses>> getCloserPeers(byte[] key, PeerAddresses target, Host us) {
try {
return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(peerIDKey);
return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key);
} catch (Exception e) {
// we can't dial quic only nodes until it's implemented
if (target.addresses.stream().allMatch(a -> a.toString().contains("quic")))
return CompletableFuture.completedFuture(Collections.emptyList());
if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) {
LOG.fine("Couldn't dial " + peerIDKey + " addrs: " + target.addresses);
LOG.fine("Couldn't dial " + target.peerId + " addrs: " + target.addresses);
} else if (e.getCause() instanceof TimeoutException)
LOG.fine("Timeout dialing " + peerIDKey + " addrs: " + target.addresses);
LOG.fine("Timeout dialing " + target.peerId + " addrs: " + target.addresses);
else if (e.getCause() instanceof ConnectionClosedException) {}
else
e.printStackTrace();
Expand Down Expand Up @@ -321,56 +326,88 @@ public CompletableFuture<Void> publishValue(PrivKey priv,
Host us) {
Set<Multihash> publishes = Collections.synchronizedSet(new HashSet<>());
for (int i=0; i < 5 && publishes.size() < 20; i++) {
List<PeerAddresses> closestPeers = findClosestPeers(publisher, 25, us);
closestPeers.forEach(peer -> ioExec.submit(() -> {
List<PeerAddresses> closestPeers = findClosestPeers(IPNS.getKey(publisher), 25, us);
List<? extends Future<?>> futures = closestPeers.stream()
.filter(p -> !publishes.contains(p.peerId))
.map(peer -> ioExec.submit(() -> {
try {
dialPeer(peer, us).join()
.putValue(publishValue, expiry, sequence,
ttlNanos, publisher, priv).thenAccept(success -> {
if (success)
publishes.add(peer.peerId);
});
} catch (Exception e) {}
})).collect(Collectors.toList());
futures.forEach(f -> {
try {
dialPeer(peer, us).join()
.putValue(publishValue, expiry, sequence,
ttlNanos, publisher, priv).thenAccept(success -> {
if (success)
publishes.add(peer.peerId);
});
f.get();
} catch (Exception e) {}
}));
});
}
return CompletableFuture.completedFuture(null);
}

public static Predicate<IpnsRecord> getNRecords(int minResults, CompletableFuture<byte[]> res) {
List<IpnsRecord> candidates = new ArrayList<>();
return rec -> {
candidates.add(rec);
if (candidates.size() >= minResults) {
// Validate and sort records by sequence number
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
res.complete(records.get(records.size() - 1).value);
return false;
}
return true;
};
}

public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us, int minResults) {
CompletableFuture<byte[]> res = new CompletableFuture<>();
resolveValue(publisher, us, getNRecords(minResults, res));
return res.thenApply(String::new);
List<IpnsRecord> candidates = resolveValue(publisher, minResults, us);
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
if (records.isEmpty())
return CompletableFuture.failedFuture(new IllegalStateException("Couldn't find IPNS value for " + publisher));
return CompletableFuture.completedFuture(new String(records.get(records.size() - 1).value));
}

public void resolveValue(Multihash publisher, Host us, Predicate<IpnsRecord> getMore) {
List<PeerAddresses> closestPeers = findClosestPeers(publisher, 20, us);
Set<PeerAddresses> queryCandidates = new HashSet<>();
Set<Multihash> queriedPeers = new HashSet<>();
for (PeerAddresses peer : closestPeers) {
if (queriedPeers.contains(peer.peerId))
continue;
queriedPeers.add(peer.peerId);
try {
GetResult res = dialPeer(peer, us).join().getValue(publisher).join();
if (res.record.isPresent() && res.record.get().publisher.equals(publisher))
if ( !getMore.test(res.record.get().value))
return;
queryCandidates.addAll(res.closerPeers);
} catch (Exception e) {}
private Optional<GetResult> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) {
try {
return Optional.of(dialPeer(peer, us).join().getValue(publisher).join());
} catch (Exception e) {}
return Optional.empty();
}
public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host us) {
byte[] key = IPNS.getKey(publisher);
List<IpnsRecord> candidates = new ArrayList<>();
Id keyId = Id.create(Hash.sha256(key), 256);
SortedSet<RoutingEntry> toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId)));
List<PeerAddresses> localClosest = engine.getKClosestPeers(key);
int queryParallelism = 3;
toQuery.addAll(localClosest.stream()
.limit(queryParallelism)
.map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p))
.collect(Collectors.toList()));
Set<Multihash> queried = Collections.synchronizedSet(new HashSet<>());
while (! toQuery.isEmpty()) {
int remaining = toQuery.size() - 3;
List<RoutingEntry> thisRound = toQuery.stream()
.limit(queryParallelism)
.collect(Collectors.toList());
List<? extends Future<?>> futures = thisRound.stream()
.map(r -> {
toQuery.remove(r);
queried.add(r.addresses.peerId);
return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us)
.ifPresent(g -> {
if (g.record.isPresent() && g.record.get().publisher.equals(publisher))
candidates.add(g.record.get().value);
for (PeerAddresses peer : g.closerPeers) {
if (! queried.contains(peer.peerId)) {
Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256);
RoutingEntry e = new RoutingEntry(peerKey, peer);
toQuery.add(e);
}
}
}));
})
.collect(Collectors.toList());
futures.forEach(f -> {
try {
f.get();
} catch (Exception e) {}
});
// exit early if we have enough results
if (candidates.size() >= minResults)
break;
if (toQuery.size() == remaining)
break;
}
return candidates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public interface KademliaController {

CompletableFuture<Boolean> send(Dht.Message msg);

default CompletableFuture<List<PeerAddresses>> closerPeers(Multihash peerID) {
default CompletableFuture<List<PeerAddresses>> closerPeers(byte[] key) {
return rpc(Dht.Message.newBuilder()
.setType(Dht.Message.MessageType.FIND_NODE)
.setKey(ByteString.copyFrom(peerID.toBytes()))
.setKey(ByteString.copyFrom(key))
.build())
.thenApply(resp -> resp.getCloserPeersList().stream()
.map(PeerAddresses::fromProtobuf)
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/peergos/FindPeerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private static long findAndDialPeer(Multihash toFind, Kademlia dht1, Host node1)
PeerAddresses peer = matching.get();
Multiaddr[] addrs = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new);
dht1.dial(node1, PeerId.fromBase58(peer.peerId.toBase58()), addrs)
.getController().join().closerPeers(toFind).join();
.getController().join().closerPeers(toFind.toBytes()).join();
System.out.println("Peer lookup took " + (t2-t1) + "ms");
return t2 - t1;
}
Expand Down

0 comments on commit 90a708b

Please sign in to comment.