Skip to content

Commit

Permalink
Write utility for bulk publishing and resolving of ipns
Browse files Browse the repository at this point in the history
Improve ipns tail resolvability

Add EmbeddedIpfs.resolveRecords for getting all results found
  • Loading branch information
ianopolous committed Dec 20, 2023
1 parent 3b06e93 commit 04e2723
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public CompletableFuture<byte[]> resolveValue(PubKey pub, int minResults) {
return CompletableFuture.completedFuture(records.get(records.size() - 1).value);
}

public List<IpnsRecord> resolveRecords(PubKey pub, int minResults) {
Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes());
List<IpnsRecord> candidates = dht.resolveValue(publisher, minResults, node);
return candidates.stream().sorted().collect(Collectors.toList());
}

public void start() {
LOG.info("Starting IPFS...");
Thread shutdownHook = new Thread(() -> {
Expand Down
105 changes: 105 additions & 0 deletions src/main/java/org/peergos/IpnsPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.peergos;

import io.ipfs.multiaddr.*;
import io.ipfs.multihash.*;
import io.libp2p.core.*;
import io.libp2p.core.crypto.*;
import io.libp2p.crypto.keys.*;
import org.peergos.blockstore.*;
import org.peergos.config.*;
import org.peergos.protocol.dht.*;
import org.peergos.protocol.ipns.*;
import org.peergos.util.*;

import java.io.*;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;

public class IpnsPublisher {
private static final ExecutorService ioExec = Executors.newFixedThreadPool(20);
public static void main(String[] a) throws Exception {
Path keysFile = Paths.get("publishers.txt");
List<PrivKey> keys;
int keycount = 1000;
EmbeddedIpfs ipfs = startIpfs();
if (keysFile.toFile().exists()) {
List<String> lines = Files.readAllLines(keysFile);
keys = lines.stream()
.map(line -> KeyKt.unmarshalPrivateKey(ArrayOps.hexToBytes(line)))
.collect(Collectors.toList());

for (int c=0; c < 100; c++) {
long t0 = System.currentTimeMillis();
List<Integer> recordCounts = resolve(keys, ipfs);
Files.write(Paths.get("publish-resolve-counts-" + LocalDateTime.now().withNano(0) + ".txt"), recordCounts.stream()
.map(i -> i.toString())
.collect(Collectors.toList()));
long t1 = System.currentTimeMillis();
System.out.println("Resolved " + recordCounts.stream().filter(n -> n > 0).count() + "/" + recordCounts.size()
+ " in " + (t1-t0)/1000 + "s");
}
} else {
keys = IntStream.range(0, keycount)
.mapToObj(i -> Ed25519Kt.generateEd25519KeyPair().getFirst())
.collect(Collectors.toList());
Files.write(keysFile, keys.stream().map(k -> ArrayOps.bytesToHex(k.bytes())).collect(Collectors.toList()));
long t0 = System.currentTimeMillis();
List<Integer> publishCounts = publish(keys, "The result".getBytes(), ipfs);
long t1 = System.currentTimeMillis();
System.out.println("Published all in " + (t1-t0)/1000 + "s");
Files.write(Paths.get("publish-counts.txt"), publishCounts.stream()
.map(i -> i.toString())
.collect(Collectors.toList()));
}
ipfs.stop().join();
System.exit(0);
}

public static List<Integer> publish(List<PrivKey> publishers, byte[] value, EmbeddedIpfs ipfs) throws IOException {
LocalDateTime expiry = LocalDateTime.now().plusDays(7);
AtomicLong done = new AtomicLong(0);
long ttlNanos = 7L * 24 * 3600 * 1000_000_000;
List<Pair<Multihash, byte[]>> values = publishers.stream()
.map(p -> new Pair<>(Multihash.deserialize(PeerId.fromPubKey(p.publicKey()).getBytes()),
IPNS.createSignedRecord(value, expiry, 1, ttlNanos, p)))
.collect(Collectors.toList());
Files.write(Paths.get("publish-values.txt"), values.stream()
.map(v -> ArrayOps.bytesToHex(v.right))
.collect(Collectors.toList()));
List<CompletableFuture<Integer>> futs = values.stream()
.map(v -> CompletableFuture.supplyAsync(() -> {
Integer res = ipfs.publishPresignedRecord(v.left, v.right).join();
System.out.println(done.incrementAndGet());
return res;
}, ioExec))
.collect(Collectors.toList());
return futs.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}

public static List<Integer> resolve(List<PrivKey> publishers, EmbeddedIpfs ipfs) {
List<Integer> res = new ArrayList<>();
for (PrivKey publisher : publishers) {
List<IpnsRecord> records = ipfs.resolveRecords(publisher.publicKey(), 30);
res.add(records.size());
}
return res;
}

public static EmbeddedIpfs startIpfs() {
HostBuilder builder = new HostBuilder().generateIdentity();
PrivKey privKey = builder.getPrivateKey();
PeerId peerId = builder.getPeerId();
IdentitySection id = new IdentitySection(privKey.bytes(), peerId);
EmbeddedIpfs ipfs = EmbeddedIpfs.build(new RamRecordStore(), new RamBlockstore(), false,
List.of(new MultiAddress("/ip6/::/tcp/0")), Config.defaultBootstrapNodes, id,
(c, s, au) -> CompletableFuture.completedFuture(true), Optional.empty());
ipfs.start();
return ipfs;
}
}
11 changes: 7 additions & 4 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us,

private Optional<GetResult> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) {
try {
return Optional.of(dialPeer(peer, us).join().getValue(publisher).join());
return Optional.of(dialPeer(peer, us).orTimeout(1, TimeUnit.SECONDS).join()
.getValue(publisher).orTimeout(1, TimeUnit.SECONDS).join());
} catch (Exception e) {}
return Optional.empty();
}
Expand All @@ -450,14 +451,14 @@ public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host u
List<PeerAddresses> localClosest = engine.getKClosestPeers(key, 20);
int queryParallelism = 3;
toQuery.addAll(localClosest.stream()
.limit(queryParallelism)
.filter(p -> hasTransportOverlap(p)) // don't waste time trying to dial nodes we can't
.map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p))
.collect(Collectors.toList()));
Set<Multihash> queried = Collections.synchronizedSet(new HashSet<>());
int countdown = 20;
while (! toQuery.isEmpty()) {
int remaining = toQuery.size() - 3;
List<RoutingEntry> thisRound = toQuery.stream()
.filter(r -> hasTransportOverlap(r.addresses)) // don't waste time trying to dial nodes we can't
.limit(queryParallelism)
.collect(Collectors.toList());
List<? extends Future<?>> futures = thisRound.stream()
Expand All @@ -469,7 +470,7 @@ public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host u
if (g.record.isPresent() && g.record.get().publisher.equals(publisher))
candidates.add(g.record.get().value);
for (PeerAddresses peer : g.closerPeers) {
if (! queried.contains(peer.peerId)) {
if (! queried.contains(peer.peerId) && hasTransportOverlap(peer)) {
Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256);
RoutingEntry e = new RoutingEntry(peerKey, peer);
toQuery.add(e);
Expand All @@ -489,6 +490,8 @@ public List<IpnsRecord> resolveValue(Multihash publisher, int minResults, Host u
if (candidates.size() >= minResults)
break;
if (toQuery.size() == remaining)
countdown--;
if (countdown <= 0)
break;
}
return candidates;
Expand Down

0 comments on commit 04e2723

Please sign in to comment.