From 04e2723326e102c2b79c5a30c42772afc66b8993 Mon Sep 17 00:00:00 2001 From: ian Date: Wed, 20 Dec 2023 21:21:00 +0000 Subject: [PATCH] Write utility for bulk publishing and resolving of ipns Improve ipns tail resolvability Add EmbeddedIpfs.resolveRecords for getting all results found --- src/main/java/org/peergos/EmbeddedIpfs.java | 6 + src/main/java/org/peergos/IpnsPublisher.java | 105 ++++++++++++++++++ .../org/peergos/protocol/dht/Kademlia.java | 11 +- 3 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/peergos/IpnsPublisher.java diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java index c3575a9e..2dae613f 100644 --- a/src/main/java/org/peergos/EmbeddedIpfs.java +++ b/src/main/java/org/peergos/EmbeddedIpfs.java @@ -122,6 +122,12 @@ public CompletableFuture resolveValue(PubKey pub, int minResults) { return CompletableFuture.completedFuture(records.get(records.size() - 1).value); } + public List resolveRecords(PubKey pub, int minResults) { + Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes()); + List candidates = dht.resolveValue(publisher, minResults, node); + return candidates.stream().sorted().collect(Collectors.toList()); + } + public void start() { LOG.info("Starting IPFS..."); Thread shutdownHook = new Thread(() -> { diff --git a/src/main/java/org/peergos/IpnsPublisher.java b/src/main/java/org/peergos/IpnsPublisher.java new file mode 100644 index 00000000..f722197e --- /dev/null +++ b/src/main/java/org/peergos/IpnsPublisher.java @@ -0,0 +1,105 @@ +package org.peergos; + +import io.ipfs.multiaddr.*; +import io.ipfs.multihash.*; +import io.libp2p.core.*; +import io.libp2p.core.crypto.*; +import io.libp2p.crypto.keys.*; +import org.peergos.blockstore.*; +import org.peergos.config.*; +import org.peergos.protocol.dht.*; +import org.peergos.protocol.ipns.*; +import org.peergos.util.*; + +import java.io.*; +import java.nio.file.*; +import java.time.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.stream.*; + +public class IpnsPublisher { + private static final ExecutorService ioExec = Executors.newFixedThreadPool(20); + public static void main(String[] a) throws Exception { + Path keysFile = Paths.get("publishers.txt"); + List keys; + int keycount = 1000; + EmbeddedIpfs ipfs = startIpfs(); + if (keysFile.toFile().exists()) { + List lines = Files.readAllLines(keysFile); + keys = lines.stream() + .map(line -> KeyKt.unmarshalPrivateKey(ArrayOps.hexToBytes(line))) + .collect(Collectors.toList()); + + for (int c=0; c < 100; c++) { + long t0 = System.currentTimeMillis(); + List recordCounts = resolve(keys, ipfs); + Files.write(Paths.get("publish-resolve-counts-" + LocalDateTime.now().withNano(0) + ".txt"), recordCounts.stream() + .map(i -> i.toString()) + .collect(Collectors.toList())); + long t1 = System.currentTimeMillis(); + System.out.println("Resolved " + recordCounts.stream().filter(n -> n > 0).count() + "/" + recordCounts.size() + + " in " + (t1-t0)/1000 + "s"); + } + } else { + keys = IntStream.range(0, keycount) + .mapToObj(i -> Ed25519Kt.generateEd25519KeyPair().getFirst()) + .collect(Collectors.toList()); + Files.write(keysFile, keys.stream().map(k -> ArrayOps.bytesToHex(k.bytes())).collect(Collectors.toList())); + long t0 = System.currentTimeMillis(); + List publishCounts = publish(keys, "The result".getBytes(), ipfs); + long t1 = System.currentTimeMillis(); + System.out.println("Published all in " + (t1-t0)/1000 + "s"); + Files.write(Paths.get("publish-counts.txt"), publishCounts.stream() + .map(i -> i.toString()) + .collect(Collectors.toList())); + } + ipfs.stop().join(); + System.exit(0); + } + + public static List publish(List publishers, byte[] value, EmbeddedIpfs ipfs) throws IOException { + LocalDateTime expiry = LocalDateTime.now().plusDays(7); + AtomicLong done = new AtomicLong(0); + long ttlNanos = 7L * 24 * 3600 * 1000_000_000; + List> values = publishers.stream() + .map(p -> new Pair<>(Multihash.deserialize(PeerId.fromPubKey(p.publicKey()).getBytes()), + IPNS.createSignedRecord(value, expiry, 1, ttlNanos, p))) + .collect(Collectors.toList()); + Files.write(Paths.get("publish-values.txt"), values.stream() + .map(v -> ArrayOps.bytesToHex(v.right)) + .collect(Collectors.toList())); + List> futs = values.stream() + .map(v -> CompletableFuture.supplyAsync(() -> { + Integer res = ipfs.publishPresignedRecord(v.left, v.right).join(); + System.out.println(done.incrementAndGet()); + return res; + }, ioExec)) + .collect(Collectors.toList()); + return futs.stream() + .map(CompletableFuture::join) + .collect(Collectors.toList()); + } + + public static List resolve(List publishers, EmbeddedIpfs ipfs) { + List res = new ArrayList<>(); + for (PrivKey publisher : publishers) { + List records = ipfs.resolveRecords(publisher.publicKey(), 30); + res.add(records.size()); + } + return res; + } + + public static EmbeddedIpfs startIpfs() { + HostBuilder builder = new HostBuilder().generateIdentity(); + PrivKey privKey = builder.getPrivateKey(); + PeerId peerId = builder.getPeerId(); + IdentitySection id = new IdentitySection(privKey.bytes(), peerId); + EmbeddedIpfs ipfs = EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), false, + List.of(new MultiAddress("/ip6/::/tcp/0")), Config.defaultBootstrapNodes, id, + (c, s, au) -> CompletableFuture.completedFuture(true), Optional.empty()); + ipfs.start(); + return ipfs; + } +} diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index c1e47318..f7127914 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -435,7 +435,8 @@ public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, private Optional getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { try { - return Optional.of(dialPeer(peer, us).join().getValue(publisher).join()); + return Optional.of(dialPeer(peer, us).orTimeout(1, TimeUnit.SECONDS).join() + .getValue(publisher).orTimeout(1, TimeUnit.SECONDS).join()); } catch (Exception e) {} return Optional.empty(); } @@ -450,14 +451,14 @@ public List resolveValue(Multihash publisher, int minResults, Host u List localClosest = engine.getKClosestPeers(key, 20); int queryParallelism = 3; toQuery.addAll(localClosest.stream() - .limit(queryParallelism) + .filter(p -> hasTransportOverlap(p)) // don't waste time trying to dial nodes we can't .map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p)) .collect(Collectors.toList())); Set queried = Collections.synchronizedSet(new HashSet<>()); + int countdown = 20; while (! toQuery.isEmpty()) { int remaining = toQuery.size() - 3; List thisRound = toQuery.stream() - .filter(r -> hasTransportOverlap(r.addresses)) // don't waste time trying to dial nodes we can't .limit(queryParallelism) .collect(Collectors.toList()); List> futures = thisRound.stream() @@ -469,7 +470,7 @@ public List resolveValue(Multihash publisher, int minResults, Host u if (g.record.isPresent() && g.record.get().publisher.equals(publisher)) candidates.add(g.record.get().value); for (PeerAddresses peer : g.closerPeers) { - if (! queried.contains(peer.peerId)) { + if (! queried.contains(peer.peerId) && hasTransportOverlap(peer)) { Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); RoutingEntry e = new RoutingEntry(peerKey, peer); toQuery.add(e); @@ -489,6 +490,8 @@ public List resolveValue(Multihash publisher, int minResults, Host u if (candidates.size() >= minResults) break; if (toQuery.size() == remaining) + countdown--; + if (countdown <= 0) break; } return candidates;