Skip to content

Commit

Permalink
Try and remove joins in kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Dec 27, 2023
1 parent e6f3b45 commit f753879
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/IpnsPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static List<Integer> resolveAndRepublish(List<PublishResult> publishers,
res.add(records.size());
done++;
if (done % 10 == 0)
System.out.println("resolved " + done);
System.out.println("resolved " + res.stream().filter(c -> c > 0).count() + " / " + done);
CompletableFuture.supplyAsync(() -> publisher.publishPresignedRecord(pub.pub, pub.record));
}
return res;
Expand Down
53 changes: 30 additions & 23 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ public CompletableFuture<Integer> publishIpnsValue(PrivKey priv,
return publishValue(publisher, signedRecord, us);
}

private boolean putValue(Multihash publisher,
byte[] signedRecord,
PeerAddresses peer,
Host us) {
private CompletableFuture<Boolean> putValue(Multihash publisher,
byte[] signedRecord,
PeerAddresses peer,
Host us) {
try {
return dialPeer(peer, us).join()
.putValue(publisher, signedRecord).join();
.putValue(publisher, signedRecord);
} catch (Exception e) {}
return false;
return CompletableFuture.completedFuture(false);
}

private boolean hasTransportOverlap(PeerAddresses p) {
Expand Down Expand Up @@ -378,10 +378,11 @@ public CompletableFuture<Integer> publishValue(Multihash publisher,
more.add(e);
}
}
ioExec.submit(() -> {
if (putValue(publisher, signedRecord, r.addresses, us))
publishes.add(r.addresses.peerId);
});
ioExec.submit(() -> putValue(publisher, signedRecord, r.addresses, us)
.thenAccept(done -> {
if (done)
publishes.add(r.addresses.peerId);
}));
return more;
}).join());
})
Expand All @@ -407,10 +408,11 @@ public CompletableFuture<Integer> publishValue(Multihash publisher,
.map(r -> {
toQuery.remove(r);
queried.add(r.addresses.peerId);
return ioExec.submit(() -> {
if (putValue(publisher, signedRecord, r.addresses, us))
publishes.add(r.addresses.peerId);
});
return ioExec.submit(() -> putValue(publisher, signedRecord, r.addresses, us)
.thenAccept(done -> {
if (done)
publishes.add(r.addresses.peerId);
}));
})
.collect(Collectors.toList());
lastFutures.forEach(f -> {
Expand All @@ -433,16 +435,21 @@ public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us,
return CompletableFuture.completedFuture(new String(records.get(records.size() - 1).value));
}

private Optional<GetResult> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) {
private CompletableFuture<Optional<GetResult>> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) {
try {
return Optional.of(dialPeer(peer, us).orTimeout(1, TimeUnit.SECONDS).join()
.getValue(publisher).orTimeout(1, TimeUnit.SECONDS).join());
} catch (Exception e) {}
return Optional.empty();
return dialPeer(peer, us)
.orTimeout(1, TimeUnit.SECONDS)
.join()
.getValue(publisher)
.orTimeout(1, TimeUnit.SECONDS)
.thenApply(Optional::of);
} catch (Exception e) {
return CompletableFuture.completedFuture(Optional.empty());
}
}
public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host us) {
byte[] key = IPNS.getKey(publisher);
List<IpnsRecord> candidates = new ArrayList<>();
List<IpnsRecord> candidates = Collections.synchronizedList(new ArrayList<>());
Optional<IpnsRecord> local = engine.getRecord(publisher);
local.ifPresent(candidates::add);

Expand All @@ -465,8 +472,8 @@ public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host u
.map(r -> {
toQuery.remove(r);
queried.add(r.addresses.peerId);
return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us)
.ifPresent(g -> {
return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us).thenAccept(get ->
get.ifPresent(g -> {
if (g.record.isPresent() && g.record.get().publisher.equals(publisher))
candidates.add(g.record.get().value);
for (PeerAddresses peer : g.closerPeers) {
Expand All @@ -476,7 +483,7 @@ public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host u
toQuery.add(e);
}
}
}));
})));
})
.collect(Collectors.toList());
futures.forEach(f -> {
Expand Down

0 comments on commit f753879

Please sign in to comment.