Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into chore/merge-upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Petrov committed Apr 29, 2024
2 parents 72d5d61 + 1da5bbf commit 878e5f3
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 22 deletions.
45 changes: 31 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.peergos</groupId>
<artifactId>nabu</artifactId>
<version>v0.7.6</version>
<version>v0.7.8</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -47,6 +47,25 @@
<version>${kotlin.version}</version>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>unpack-dependencies</id>
<phase>package</phase>
<goals>
<goal>unpack-dependencies</goal>
</goals>
<configuration>
<excludeScope>system</excludeScope>
<excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down Expand Up @@ -100,22 +119,20 @@
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<minimizeJar>true</minimizeJar>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/peergos/AggregatedMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ private static Counter build(String name, String help) {
public static final Counter API_BLOCK_RM = build("api_block_rm", "Total calls to block/rm.");
public static final Counter API_BLOCK_RM_BULK = build("api_block_rm_bulk", "Total calls to block/rm/bulk.");
public static final Counter API_BLOCK_STAT = build("api_block_stat", "Total calls to block/stat.");
public static final Counter API_BLOCK_STAT_BULK = build("api_block_stat_bulk", "Total calls to block/stat/bulk.");
public static final Counter API_REFS_LOCAL = build("api_refs_local", "Total calls to refs/local.");
public static final Counter API_BLOCK_HAS = build("api_block_has", "Total calls to block/has.");
public static final Counter API_BLOOM_ADD = build("api_bloom_add", "Total calls to bloom/add.");
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/peergos/Want.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package org.peergos;

import io.ipfs.cid.Cid;
import org.peergos.config.*;

import java.util.*;

public class Want {
public class Want implements Jsonable {
public final Cid cid;
public final Optional<String> authHex;
public Want(Cid cid, Optional<String> authHex) {
Expand All @@ -24,6 +25,17 @@ public boolean equals(Object o) {
return cid.equals(want.cid) && authHex.equals(want.authHex);
}

public Map<String, Object> toJson() {
Map<String, Object> m = new LinkedHashMap<>();
m.put("c", cid.toString());
authHex.ifPresent(h -> m.put("a", h));
return m;
}

public static Want fromJson(Map<String, String> m) {
return new Want(Cid.decode(m.get("c")), Optional.ofNullable(m.get("a")));
}

@Override
public int hashCode() {
return Objects.hash(cid, authHex);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/peergos/blockstore/RamBlockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.ipfs.multihash.*;
import org.peergos.*;
import org.peergos.blockstore.metadatadb.BlockMetadata;
import org.peergos.cbor.*;
import org.peergos.util.*;

import java.util.*;
Expand Down Expand Up @@ -60,7 +61,7 @@ public CompletableFuture<List<Cid>> refs(boolean useBlockstore) {

@Override
public CompletableFuture<BlockMetadata> getBlockMetadata(Cid h) {
throw new IllegalStateException("Unsupported operation!");
byte[] block = get(h).join().get();
return Futures.of(new BlockMetadata(block.length, CborObject.getLinks(h, block)));
}

}
35 changes: 34 additions & 1 deletion src/main/java/org/peergos/net/APIHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.libp2p.core.PeerId;
import io.libp2p.crypto.keys.*;
import org.peergos.*;
import org.peergos.cbor.*;
import org.peergos.protocol.ipns.*;
import org.peergos.protocol.ipns.pb.*;
import org.peergos.util.*;
Expand All @@ -18,7 +19,7 @@

public class APIHandler extends Handler {
public static final String API_URL = "/api/v0/";
public static final Version CURRENT_VERSION = Version.parse("0.7.6");
public static final Version CURRENT_VERSION = Version.parse("0.7.8");
private static final Logger LOG = Logging.LOG();

private static final boolean LOGGING = true;
Expand All @@ -30,6 +31,7 @@ public class APIHandler extends Handler {
public static final String RM = "block/rm";
public static final String RM_BULK = "block/rm/bulk";
public static final String STAT = "block/stat";
public static final String BULK_STAT = "block/stat/bulk";
public static final String REFS_LOCAL = "refs/local";
public static final String BLOOM_ADD = "bloom/add";
public static final String HAS = "block/has";
Expand Down Expand Up @@ -99,6 +101,37 @@ public void handleCallToAPI(HttpExchange httpExchange) {
}
break;
}
case BULK_STAT: {
AggregatedMetrics.API_BLOCK_STAT_BULK.inc();
Map<String, Object> json = (Map<String, Object>) JSONParser.parse(new String(readFully(httpExchange.getRequestBody())));
List<Want> wants = ((List<Map<String, String>>)json.get("wants"))
.stream()
.map(Want::fromJson)
.collect(Collectors.toList());
Set<PeerId> peers = Optional.ofNullable(params.get("peers"))
.map(p -> p.stream().map(PeerId::fromBase58).collect(Collectors.toSet()))
.orElse(Collections.emptySet());
List<HashedBlock> blocks = ipfs.getBlocks(wants, peers, true);
if (wants.size() == blocks.size()) {
List<List<String>> links = blocks.stream()
.map(b -> b.hash.codec.equals(Cid.Codec.Raw) ?
Collections.<String>emptyList() :
CborObject.getLinks(b.hash, b.block)
.stream()
.filter(c -> c.getType() != Multihash.Type.id)
.map(Cid::toString)
.collect(Collectors.toList()))
.collect(Collectors.toList());
replyJson(httpExchange, JSONParser.toString(links));
} else {
try {
httpExchange.sendResponseHeaders(400, 0);
} catch (IOException ioe) {
HttpUtil.replyError(httpExchange, ioe);
}
}
break;
}
case PUT: { // https://docs.ipfs.tech/reference/kubo/rpc/#api-v0-block-put
AggregatedMetrics.API_BLOCK_PUT.inc();
List<String> format = params.get("format");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/peergos/protocol/bitswap/Bitswap.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void sendWants(Host us, Set<Want> wants, Set<PeerId> peers) {
Map<Want, PeerId> haves = engine.getHaves();
// broadcast to all connected bitswap peers if none are supplied
Set<PeerId> audience = peers.isEmpty() ? getBroadcastAudience() : peers;
LOG.info("Send wants: " + wants.size() + " to " + audience);
LOG.info("Send wants: " + wants.size() + " to " + audience + " cids: " + wants.stream().limit(2).map(w -> w.cid).collect(Collectors.toList()));
List<MessageOuterClass.Message.Wantlist.Entry> wantsProto = wants.stream()
.map(want -> MessageOuterClass.Message.Wantlist.Entry.newBuilder()
.setWantType(audience.size() <= 2 || haves.containsKey(want) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void receiveMessage(MessageOuterClass.Message msg, Stream source, Counter
waiter.result.complete(new HashedBlock(c, data));
localWants.remove(w);
} else
LOG.info("Received block we don't want: z" + c.toBase58() + " from " + sourcePeerId.bareMultihash());
LOG.info("Received block we don't want: " + c + " from " + sourcePeerId.bareMultihash());
}
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.prometheus.client.*;
import org.jetbrains.annotations.*;
import org.peergos.protocol.bitswap.pb.*;
import org.peergos.util.Logging;
import org.peergos.util.*;

import java.util.concurrent.*;
import java.util.logging.*;
Expand Down Expand Up @@ -36,7 +36,7 @@ public class BitswapProtocol extends ProtobufProtocolHandler<BitswapController>
private final BitswapEngine engine;

public BitswapProtocol(BitswapEngine engine) {
super(MessageOuterClass.Message.getDefaultInstance(), engine.maxMessageSize(), engine.maxMessageSize());
super(MessageOuterClass.Message.getDefaultInstance(), engine.maxMessageSize(), Long.MAX_VALUE);
this.engine = engine;
}

Expand Down Expand Up @@ -78,5 +78,10 @@ public void onMessage(@NotNull Stream stream, MessageOuterClass.Message msg) {
receivedBytes.inc(msg.getSerializedSize());
engine.receiveMessage(msg, stream, sentBytes);
}

@Override
public void onException(@Nullable Throwable cause) {
LOG.log(Level.WARNING, cause.getMessage(), cause);
}
}
}
1 change: 1 addition & 0 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ private Multiaddr[] getPublic(PeerAddresses target) {
private CompletableFuture<? extends KademliaController> dialPeer(PeerAddresses target, Host us) {
Multiaddr[] multiaddrs = target.addresses.stream()
.map(a -> Multiaddr.fromString(a.toString()))
.filter(a -> ! a.has(Protocol.DNS) && ! a.has(Protocol.DNS4) && ! a.has(Protocol.DNS6))
.collect(Collectors.toList()).toArray(new Multiaddr[0]);
return dial(us, PeerId.fromBase58(target.peerId.toBase58()), multiaddrs).getController();
}
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/org/peergos/BitswapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,42 @@ public void getBlock() {
}
}

@Test
public void getTenBlocks() {
HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(),
new RamProviderStore(1000), new RamRecordStore(), new RamBlockstore(), (c, p, a) -> CompletableFuture.completedFuture(true));
Host node1 = builder1.build();
RamBlockstore blockstore2 = new RamBlockstore();
HostBuilder builder2 = HostBuilder.create(TestPorts.getPort(),
new RamProviderStore(1000), new RamRecordStore(), blockstore2, (c, p, a) -> CompletableFuture.completedFuture(true));
Host node2 = builder2.build();
node1.start().join();
node2.start().join();
try {
Multiaddr address2 = node2.listenAddresses().get(0);
List<Cid> hashes = new ArrayList<>();
Random random = new Random(28);
for (int i=0; i < 10; i++) {
byte[] blockData = new byte[1024*1024];
random.nextBytes(blockData);
Cid hash = blockstore2.put(blockData, Cid.Codec.Raw).join();
hashes.add(hash);
}

Bitswap bitswap1 = builder1.getBitswap().get();
node1.getAddressBook().addAddrs(address2.getPeerId(), 0, address2).join();
List<HashedBlock> receivedBlocks = bitswap1.get(hashes.stream().map(Want::new).collect(Collectors.toList()), node1, Set.of(address2.getPeerId()), false)
.stream()
.map(f -> f.join())
.collect(Collectors.toList());
if (receivedBlocks.size() != hashes.size())
throw new IllegalStateException("Incorrect number of blocks returned!");
} finally {
node1.stop();
node2.stop();
}
}

@Test
public void blockFlooder() {
HostBuilder builder1 = HostBuilder.create(TestPorts.getPort(),
Expand Down

0 comments on commit 878e5f3

Please sign in to comment.