Skip to content

Commit

Permalink
Switch to parsing RLP with tuweni-rlp instead of web3j. (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Feb 17, 2022
1 parent 214966a commit 9bb8ad3
Show file tree
Hide file tree
Showing 28 changed files with 682 additions and 525 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ repositories {

dependencies {
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.apache.tuweni:tuweni-units'
implementation 'org.apache.tuweni:tuweni-crypto'
implementation 'org.apache.tuweni:tuweni-rlp'
implementation 'org.apache.tuweni:tuweni-units'
implementation 'org.bouncycastle:bcprov-jdk15on'

implementation 'com.google.guava:guava'
Expand Down
3 changes: 2 additions & 1 deletion gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ dependencyManagement {
dependency 'org.bouncycastle:bcprov-jdk15on:1.70'

dependency 'org.assertj:assertj-core:3.22.0'
dependency 'org.web3j:core:5.0.0'
dependency 'org.web3j:core:4.9.0'
dependency 'org.mockito:mockito-core:4.3.1'

dependencySet(group: 'org.apache.tuweni', version: '2.1.0') {
entry 'tuweni-bytes'
entry 'tuweni-crypto'
entry 'tuweni-rlp'
entry 'tuweni-units'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package org.ethereum.beacon.discovery;

import static org.ethereum.beacon.discovery.util.Utils.RECOVERABLE_ERRORS_PREDICATE;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -129,8 +131,11 @@ public CompletableFuture<Void> start() {
incomingPipeline.build();
outgoingPipeline.build();
Flux.from(discoveryServer.getIncomingPackets())
.onErrorContinue((err, msg) -> LOG.debug("Error while processing message: " + err))
.subscribe(incomingPipeline::push);
.doOnNext(incomingPipeline::push)
.onErrorContinue(
RECOVERABLE_ERRORS_PREDICATE,
(err, msg) -> LOG.debug("Error while processing message: " + err))
.subscribe();
return discoveryServer
.start()
.thenAccept(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DiscoverySystemBuilder {
private Optional<InetSocketAddress> listenAddress = Optional.empty();
private NodeRecord localNodeRecord;
private Bytes privateKey;
private final NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
private NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
private Schedulers schedulers;
private NodeRecordListener localNodeRecordListener = NodeRecordListener.NOOP;
private NewAddressHandler newAddressHandler;
Expand Down Expand Up @@ -79,6 +79,11 @@ public DiscoverySystemBuilder privateKey(final Bytes privateKey) {
return this;
}

public DiscoverySystemBuilder nodeRecordFactory(NodeRecordFactory nodeRecordFactory) {
this.nodeRecordFactory = nodeRecordFactory;
return this;
}

public DiscoverySystemBuilder bootnodes(final String... enrs) {
bootnodes =
Stream.of(enrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
package org.ethereum.beacon.discovery.message;

import static com.google.common.base.Preconditions.checkArgument;
import static org.ethereum.beacon.discovery.util.RlpUtil.checkMaxSize;

import com.google.common.base.Objects;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.rlp.RLP;
import org.apache.tuweni.rlp.RLPReader;
import org.apache.tuweni.rlp.RLPWriter;
import org.ethereum.beacon.discovery.util.DecodeException;
import org.ethereum.beacon.discovery.util.RlpDecodeException;
import org.ethereum.beacon.discovery.util.RlpUtil;
import org.web3j.rlp.RlpEncoder;
import org.web3j.rlp.RlpList;
import org.web3j.rlp.RlpString;
import org.web3j.rlp.RlpType;

/**
* FINDNODE queries for nodes at the given logarithmic distance from the recipient's node ID. The
Expand All @@ -36,20 +34,15 @@ public FindNodeMessage(Bytes requestId, List<Integer> distances) {
this.distances = distances;
}

private static FindNodeMessage fromRlp(List<RlpType> rlpList) throws DecodeException {
if (rlpList.size() != 2) {
throw new RlpDecodeException("Invalid RLP list size for FindNode message-data: " + rlpList);
}
Bytes requestId = RlpUtil.asString(rlpList.get(0), RlpUtil.maxSize(MAX_REQUEST_ID_SIZE));
List<RlpType> rlpDistances = RlpUtil.asList(rlpList.get(1));
List<Integer> distances =
rlpDistances.stream().map(RlpUtil::asInteger).collect(Collectors.toList());

return new FindNodeMessage(requestId, distances);
}

public static FindNodeMessage fromBytes(Bytes bytes) throws DecodeException {
return fromRlp(RlpUtil.decodeSingleList(bytes));
return RlpUtil.readRlpList(
bytes,
listReader -> {
final Bytes requestId1 = checkMaxSize(listReader.readValue(), MAX_REQUEST_ID_SIZE);
List<Integer> distances1 = listReader.readListContents(RLPReader::readInt);
RlpUtil.checkComplete(listReader);
return new FindNodeMessage(requestId1, distances1);
});
}

@Override
Expand All @@ -65,14 +58,11 @@ public List<Integer> getDistances() {
public Bytes getBytes() {
return Bytes.concatenate(
Bytes.of(getCode().byteCode()),
Bytes.wrap(
RlpEncoder.encode(
new RlpList(
RlpString.create(requestId.toArray()),
new RlpList(
getDistances().stream()
.map(RlpString::create)
.collect(Collectors.toList()))))));
RLP.encodeList(
writer -> {
writer.writeValue(requestId);
writer.writeList(getDistances(), RLPWriter::writeInt);
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@

package org.ethereum.beacon.discovery.message;

import static org.ethereum.beacon.discovery.util.RlpUtil.checkMaxSize;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.rlp.RLP;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory;
import org.ethereum.beacon.discovery.util.RlpDecodeException;
import org.ethereum.beacon.discovery.util.RlpUtil;
import org.web3j.rlp.RlpEncoder;
import org.web3j.rlp.RlpList;
import org.web3j.rlp.RlpString;
import org.web3j.rlp.RlpType;

/**
* NODES is the response to a FINDNODE or TOPICQUERY message. Multiple NODES messages may be sent as
Expand All @@ -35,21 +32,15 @@ public NodesMessage(Bytes requestId, Integer total, List<NodeRecord> nodeRecords
this.nodeRecords = nodeRecords;
}

private static NodesMessage fromRlp(List<RlpType> rlpList, NodeRecordFactory nodeRecordFactory) {
if (rlpList.size() != 3) {
throw new RlpDecodeException("Invalid RLP list size for Nodes message-data: " + rlpList);
}
List<RlpType> nodeRecords = RlpUtil.asList(rlpList.get(2));
return new NodesMessage(
RlpUtil.asString(rlpList.get(0), RlpUtil.maxSize(MAX_REQUEST_ID_SIZE)),
RlpUtil.asInteger(rlpList.get(1)),
nodeRecords.stream()
.map(rl -> nodeRecordFactory.fromRlpList(RlpUtil.asList(rl)))
.collect(Collectors.toList()));
}

public static NodesMessage fromBytes(Bytes messageBytes, NodeRecordFactory nodeRecordFactory) {
return fromRlp(RlpUtil.decodeSingleList(messageBytes), nodeRecordFactory);
return RlpUtil.readRlpList(
messageBytes,
reader -> {
final Bytes requestId = checkMaxSize(reader.readValue(), MAX_REQUEST_ID_SIZE);
final int total = reader.readInt();
final List<NodeRecord> nodeRecords = reader.readListContents(nodeRecordFactory::fromRlp);
return new NodesMessage(requestId, total, nodeRecords);
});
}

@Override
Expand All @@ -69,15 +60,16 @@ public synchronized List<NodeRecord> getNodeRecords() {
public Bytes getBytes() {
return Bytes.concatenate(
Bytes.of(getCode().byteCode()),
Bytes.wrap(
RlpEncoder.encode(
new RlpList(
RlpString.create(requestId.toArray()),
RlpString.create(total),
new RlpList(
getNodeRecords().stream()
.map(NodeRecord::asRlp)
.collect(Collectors.toList()))))));
RLP.encodeList(
writer -> {
writer.writeValue(requestId);
writer.writeInt(total);
writer.writeList(
getNodeRecords(),
(itemWriter, nodeRecord) -> {
nodeRecord.writeRlp(itemWriter);
});
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

package org.ethereum.beacon.discovery.message;

import static org.ethereum.beacon.discovery.util.RlpUtil.CONS_UINT64;
import static org.ethereum.beacon.discovery.util.RlpUtil.maxSize;
import static org.ethereum.beacon.discovery.util.RlpUtil.checkMaxSize;

import com.google.common.base.Objects;
import java.util.List;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.rlp.RLP;
import org.apache.tuweni.units.bigints.UInt64;
import org.ethereum.beacon.discovery.util.RlpUtil;
import org.web3j.rlp.RlpEncoder;
import org.web3j.rlp.RlpList;
import org.web3j.rlp.RlpString;

/**
* PING checks whether the recipient is alive and informs it about the sender's ENR sequence number.
Expand All @@ -31,8 +27,13 @@ public PingMessage(Bytes requestId, UInt64 enrSeq) {
}

public static PingMessage fromBytes(Bytes bytes) {
List<Bytes> list = RlpUtil.decodeListOfStrings(bytes, maxSize(8), CONS_UINT64);
return new PingMessage(list.get(0), UInt64.fromBytes(list.get(1)));
return RlpUtil.readRlpList(
bytes,
reader -> {
final Bytes requestId = checkMaxSize(reader.readValue(), MAX_REQUEST_ID_SIZE);
final UInt64 enrSeq = UInt64.valueOf(reader.readBigInteger());
return new PingMessage(requestId, enrSeq);
});
}

@Override
Expand All @@ -48,11 +49,11 @@ public UInt64 getEnrSeq() {
public Bytes getBytes() {
return Bytes.concatenate(
Bytes.of(getCode().byteCode()),
Bytes.wrap(
RlpEncoder.encode(
new RlpList(
RlpString.create(requestId.toArray()),
RlpString.create(enrSeq.toBigInteger())))));
RLP.encodeList(
writer -> {
writer.writeValue(requestId);
writer.writeBigInteger(enrSeq.toBigInteger());
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@

package org.ethereum.beacon.discovery.message;

import static org.ethereum.beacon.discovery.util.RlpUtil.CONS_UINT64;
import static org.ethereum.beacon.discovery.util.RlpUtil.enumSizes;
import static org.ethereum.beacon.discovery.util.RlpUtil.maxSize;
import static org.ethereum.beacon.discovery.util.RlpUtil.strictSize;
import static org.ethereum.beacon.discovery.util.RlpUtil.checkMaxSize;
import static org.ethereum.beacon.discovery.util.RlpUtil.checkSizeEither;

import com.google.common.base.Objects;
import java.util.List;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.rlp.RLP;
import org.apache.tuweni.units.bigints.UInt64;
import org.ethereum.beacon.discovery.util.RlpUtil;
import org.web3j.rlp.RlpEncoder;
import org.web3j.rlp.RlpList;
import org.web3j.rlp.RlpString;

/** PONG is the reply to PING {@link PingMessage} */
public class PongMessage implements V5Message {
Expand All @@ -37,11 +32,15 @@ public PongMessage(Bytes requestId, UInt64 enrSeq, Bytes recipientIp, int recipi
}

public static PongMessage fromBytes(Bytes bytes) {
List<Bytes> list =
RlpUtil.decodeListOfStrings(
bytes, maxSize(8), CONS_UINT64, enumSizes(4, 16), strictSize(2));
return new PongMessage(
list.get(0), UInt64.fromBytes(list.get(1)), list.get(2), list.get(3).toInt());
return RlpUtil.readRlpList(
bytes,
reader -> {
final Bytes requestId = checkMaxSize(reader.readValue(), MAX_REQUEST_ID_SIZE);
final UInt64 enrSeq = UInt64.valueOf(reader.readBigInteger());
final Bytes recipientIp = checkSizeEither(reader.readValue(), 4, 16);
final int recipientPort = reader.readInt();
return new PongMessage(requestId, enrSeq, recipientIp, recipientPort);
});
}

@Override
Expand All @@ -65,13 +64,13 @@ public int getRecipientPort() {
public Bytes getBytes() {
return Bytes.concatenate(
Bytes.of(getCode().byteCode()),
Bytes.wrap(
RlpEncoder.encode(
new RlpList(
RlpString.create(requestId.toArray()),
RlpString.create(enrSeq.toBigInteger()),
RlpString.create(recipientIp.toArray()),
RlpString.create(recipientPort)))));
RLP.encodeList(
writer -> {
writer.writeValue(requestId);
writer.writeBigInteger(enrSeq.toBigInteger());
writer.writeValue(recipientIp);
writer.writeInt(recipientPort);
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@

package org.ethereum.beacon.discovery.message;

import static org.ethereum.beacon.discovery.util.RlpUtil.CONS_ANY;
import static org.ethereum.beacon.discovery.util.RlpUtil.maxSize;
import static org.ethereum.beacon.discovery.util.RlpUtil.checkMaxSize;

import java.util.List;
import java.util.Objects;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.rlp.RLP;
import org.ethereum.beacon.discovery.util.RlpUtil;
import org.web3j.rlp.RlpEncoder;
import org.web3j.rlp.RlpList;
import org.web3j.rlp.RlpString;

/**
* TALKREQ sends an application-level request. The purpose of this message is pre-negotiating
Expand All @@ -35,8 +31,14 @@ public TalkReqMessage(Bytes requestId, Bytes protocol, Bytes request) {
}

public static TalkReqMessage fromBytes(Bytes bytes) {
List<Bytes> list = RlpUtil.decodeListOfStrings(bytes, maxSize(8), CONS_ANY, CONS_ANY);
return new TalkReqMessage(list.get(0), list.get(1), list.get(2));
return RlpUtil.readRlpList(
bytes,
reader -> {
final Bytes requestId = checkMaxSize(reader.readValue(), MAX_REQUEST_ID_SIZE);
final Bytes protocol = reader.readValue();
final Bytes request = reader.readValue();
return new TalkReqMessage(requestId, protocol, request);
});
}

@Override
Expand All @@ -56,12 +58,12 @@ public Bytes getRequest() {
public Bytes getBytes() {
return Bytes.concatenate(
Bytes.of(getCode().byteCode()),
Bytes.wrap(
RlpEncoder.encode(
new RlpList(
RlpString.create(requestId.toArrayUnsafe()),
RlpString.create(protocol.toArrayUnsafe()),
RlpString.create(request.toArrayUnsafe())))));
RLP.encodeList(
writer -> {
writer.writeValue(requestId);
writer.writeValue(protocol);
writer.writeValue(request);
}));
}

@Override
Expand Down
Loading

0 comments on commit 9bb8ad3

Please sign in to comment.