diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java index 2bbbf82f076..ff7e0c1d390 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java @@ -20,4 +20,7 @@ public class NetworkConstants { public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128; public static final int NODE_ID_BITS = 256; + + // https://github.com/ethereum/consensus-specs/pull/3767 + public static final int MAX_CONCURRENT_REQUESTS = 2; } diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java index 7927f27ec26..2d18705a41e 100644 --- a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java @@ -28,6 +28,10 @@ public class ThrottlingTaskQueue { private int inflightTaskCount = 0; + public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) { + return new ThrottlingTaskQueue(maximumConcurrentTasks); + } + public static ThrottlingTaskQueue create( final int maximumConcurrentTasks, final MetricsSystem metricsSystem, diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java index afa52355653..88e57f491a7 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java @@ -41,7 +41,6 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber; import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; @@ -51,6 +50,8 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { private static final Logger LOG = LogManager.getLogger(); + private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10); + private final AsyncRunner asyncRunner; private final RecentChainData recentChainData; private final Eth2PeerFactory eth2PeerFactory; @@ -66,7 +67,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { private final int eth2RpcOutstandingPingThreshold; private final Duration eth2StatusUpdateInterval; - private final SpecConfig specConfig; Eth2PeerManager( final Spec spec, @@ -99,7 +99,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { this.eth2RpcPingInterval = eth2RpcPingInterval; this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold; this.eth2StatusUpdateInterval = eth2StatusUpdateInterval; - this.specConfig = spec.getGenesisSpecConfig(); } public static Eth2PeerManager create( @@ -237,7 +236,7 @@ private void ensureStatusReceived(final Eth2Peer peer) { .ifExceptionGetsHereRaiseABug(); } }, - Duration.ofSeconds(specConfig.getRespTimeout())) + STATUS_RECEIVED_TIMEOUT) .finish( () -> {}, error -> { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index 63ba993db5f..75a4feed619 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -115,8 +115,8 @@ public static BeaconChainMethods create( final MetadataMessagesFactory metadataMessagesFactory, final RpcEncoding rpcEncoding) { return new BeaconChainMethods( - createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding), - createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding), + createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding), + createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding), createBeaconBlocksByRoot( spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding), createBeaconBlocksByRange( @@ -144,11 +144,10 @@ public static BeaconChainMethods create( rpcEncoding, recentChainData), createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding), - createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); + createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); } private static Eth2RpcMethod createStatus( - final Spec spec, final AsyncRunner asyncRunner, final StatusMessageFactory statusMessageFactory, final PeerLookup peerLookup, @@ -165,12 +164,10 @@ private static Eth2RpcMethod createStatus( true, contextCodec, statusHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); } private static Eth2RpcMethod createGoodBye( - final Spec spec, final AsyncRunner asyncRunner, final MetricsSystem metricsSystem, final PeerLookup peerLookup, @@ -187,8 +184,7 @@ private static Eth2RpcMethod createGoodBye( false, contextCodec, goodbyeHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); } private static Eth2RpcMethod @@ -221,8 +217,7 @@ private static Eth2RpcMethod createGoodBye( expectResponseToRequest, forkDigestContextCodec, beaconBlocksByRootHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method)); @@ -259,8 +254,7 @@ private static Eth2RpcMethod createGoodBye( expectResponseToRequest, forkDigestContextCodec, beaconBlocksByRangeHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method)); @@ -299,8 +293,7 @@ private static Eth2RpcMethod createGoodBye( true, forkDigestContextCodec, blobSidecarsByRootHandler, - peerLookup, - spec.getNetworkingConfig())); + peerLookup)); } private static Optional> @@ -336,8 +329,7 @@ private static Eth2RpcMethod createGoodBye( true, forkDigestContextCodec, blobSidecarsByRangeHandler, - peerLookup, - spec.getNetworkingConfig())); + peerLookup)); } private static Eth2RpcMethod createMetadata( @@ -369,8 +361,7 @@ private static Eth2RpcMethod createMetadata( expectResponse, phase0ContextCodec, messageHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) { final SszSchema altairMetadataSchema = @@ -392,8 +383,7 @@ private static Eth2RpcMethod createMetadata( expectResponse, altairContextCodec, messageHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method)); } else { @@ -402,7 +392,6 @@ private static Eth2RpcMethod createMetadata( } private static Eth2RpcMethod createPing( - final Spec spec, final AsyncRunner asyncRunner, final MetadataMessagesFactory metadataMessagesFactory, final PeerLookup peerLookup, @@ -419,8 +408,7 @@ private static Eth2RpcMethod createPing( true, contextCodec, statusHandler, - peerLookup, - spec.getNetworkingConfig()); + peerLookup); } public Collection> all() { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java index dc8dd90ed46..2a1bce8e6f3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java @@ -28,13 +28,13 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler; import tech.pegasys.teku.networking.p2p.rpc.RpcStream; import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException; -import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class Eth2IncomingRequestHandler< TRequest extends RpcRequest & SszData, TResponse extends SszData> implements RpcRequestHandler { private static final Logger LOG = LogManager.getLogger(); + private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10); private final PeerLookup peerLookup; private final LocalMessageHandler localMessageHandler; @@ -45,7 +45,6 @@ public class Eth2IncomingRequestHandler< private final String protocolId; private final AsyncRunner asyncRunner; private final AtomicBoolean requestHandled = new AtomicBoolean(false); - private final Duration respTimeout; public Eth2IncomingRequestHandler( final String protocolId, @@ -53,15 +52,13 @@ public Eth2IncomingRequestHandler( final RpcRequestDecoder requestDecoder, final AsyncRunner asyncRunner, final PeerLookup peerLookup, - final LocalMessageHandler localMessageHandler, - final NetworkingSpecConfig networkingConfig) { + final LocalMessageHandler localMessageHandler) { this.protocolId = protocolId; this.asyncRunner = asyncRunner; this.peerLookup = peerLookup; this.localMessageHandler = localMessageHandler; this.responseEncoder = responseEncoder; this.requestDecoder = requestDecoder; - this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout()); } @Override @@ -121,15 +118,14 @@ private void handleRequest( } private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { - final Duration timeout = respTimeout; asyncRunner - .getDelayedFuture(timeout) + .getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT) .thenAccept( (__) -> { if (!requestHandled.get()) { LOG.debug( "Failed to receive incoming request data within {} sec for protocol {}. Close stream.", - timeout.toSeconds(), + RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(), protocolId); stream.closeAbruptly().ifExceptionGetsHereRaiseABug(); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index b39e2f70ad2..320a5a3f943 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -22,10 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; @@ -38,7 +37,6 @@ import tech.pegasys.teku.networking.p2p.peer.NodeId; import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler; import tech.pegasys.teku.networking.p2p.rpc.RpcStream; -import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class Eth2OutgoingRequestHandler< @@ -56,13 +54,15 @@ enum State { private static final Logger LOG = LogManager.getLogger(); + @VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10); + @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30); + private final AsyncRunner asyncRunner; private final int maximumResponseChunks; private final Eth2RpcResponseHandler responseHandler; private final ResponseStream responseStream; private final AsyncRunner timeoutRunner; - private final AtomicBoolean hasReceivedInitialBytes = new AtomicBoolean(false); private final AtomicInteger currentChunkCount = new AtomicInteger(0); private final AtomicReference state; private final AtomicReference> responseProcessor = @@ -71,8 +71,6 @@ enum State { private final String protocolId; private final RpcResponseDecoder responseDecoder; private final boolean shouldReceiveResponse; - private final Duration ttbfTimeout; - private final Duration respTimeout; public Eth2OutgoingRequestHandler( final AsyncRunner asyncRunner, @@ -81,29 +79,24 @@ public Eth2OutgoingRequestHandler( final RpcResponseDecoder responseDecoder, final boolean shouldReceiveResponse, final TRequest request, - final Eth2RpcResponseHandler responseHandler, - final NetworkingSpecConfig networkingConfig) { + final Eth2RpcResponseHandler responseHandler) { this.asyncRunner = asyncRunner; this.timeoutRunner = timeoutRunner; this.maximumResponseChunks = request.getMaximumResponseChunks(); - this.responseHandler = responseHandler; responseStream = new ResponseStream<>(responseHandler); this.responseDecoder = responseDecoder; this.shouldReceiveResponse = shouldReceiveResponse; this.protocolId = protocolId; - this.ttbfTimeout = Duration.of(networkingConfig.getTtfbTimeout(), ChronoUnit.SECONDS); - this.respTimeout = Duration.of(networkingConfig.getRespTimeout(), ChronoUnit.SECONDS); this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED); } public void handleInitialPayloadSent(final RpcStream stream) { // Close the write side of the stream stream.closeWriteStream().ifExceptionGetsHereRaiseABug(); - if (shouldReceiveResponse) { - // Start timer for first bytes - ensureFirstBytesArriveWithinTimeLimit(stream); + // Setup initial chunk timeout + ensureNextResponseChunkArrivesInTime(stream, currentChunkCount.get(), currentChunkCount); } else { ensureReadCompleteArrivesInTime(stream); } @@ -123,8 +116,6 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data)); } - onFirstByteReceived(rpcStream); - List maybeResponses = responseDecoder.decodeNextResponses(data); final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size()); @@ -137,7 +128,7 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By } if (chunksReceived < maximumResponseChunks) { if (!maybeResponses.isEmpty()) { - ensureNextResponseArrivesInTime(rpcStream, chunksReceived, currentChunkCount); + ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount); } } else { if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) { @@ -156,14 +147,14 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By private AsyncResponseProcessor getResponseProcessor(final RpcStream rpcStream) { return responseProcessor.updateAndGet( - oldVal -> { - if (oldVal == null) { - return new AsyncResponseProcessor<>( - asyncRunner, responseStream, throwable -> abortRequest(rpcStream, throwable)); - } else { - return oldVal; - } - }); + oldVal -> + Objects.requireNonNullElseGet( + oldVal, + () -> + new AsyncResponseProcessor<>( + asyncRunner, + responseStream, + throwable -> abortRequest(rpcStream, throwable)))); } private String bufToString(final ByteBuf buf) { @@ -216,13 +207,6 @@ private boolean transferToState(final State toState, final Collection fro return false; } - private void onFirstByteReceived(final RpcStream rpcStream) { - if (hasReceivedInitialBytes.compareAndSet(false, true)) { - // Setup initial chunk timeout - ensureNextResponseArrivesInTime(rpcStream, currentChunkCount.get(), currentChunkCount); - } - } - private void completeRequest(final RpcStream rpcStream) { getResponseProcessor(rpcStream) .finishProcessing() @@ -266,49 +250,35 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina } } - private void ensureFirstBytesArriveWithinTimeLimit(final RpcStream stream) { - timeoutRunner - .getDelayedFuture(ttbfTimeout) - .thenAccept( - (__) -> { - if (!hasReceivedInitialBytes.get()) { - abortRequest( - stream, - new RpcTimeoutException("Timed out waiting for initial response", ttbfTimeout)); - } - }) - .ifExceptionGetsHereRaiseABug(); - } - - private void ensureNextResponseArrivesInTime( + private void ensureNextResponseChunkArrivesInTime( final RpcStream stream, final int previousResponseCount, final AtomicInteger currentResponseCount) { - final Duration timeout = respTimeout; timeoutRunner - .getDelayedFuture(timeout) + .getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT) .thenAccept( (__) -> { if (previousResponseCount == currentResponseCount.get()) { abortRequest( stream, new RpcTimeoutException( - "Timed out waiting for response chunk " + previousResponseCount, timeout)); + "Timed out waiting for response chunk " + previousResponseCount, + RESPONSE_CHUNK_ARRIVAL_TIMEOUT)); } }) .ifExceptionGetsHereRaiseABug(); } private void ensureReadCompleteArrivesInTime(final RpcStream stream) { - final Duration timeout = respTimeout; timeoutRunner - .getDelayedFuture(timeout) + .getDelayedFuture(READ_COMPLETE_TIMEOUT) .thenAccept( (__) -> { if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) { abortRequest( stream, - new RpcTimeoutException("Timed out waiting for read channel close", timeout)); + new RpcTimeoutException( + "Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT)); } }) .ifExceptionGetsHereRaiseABug(); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java deleted file mode 100644 index 0fce1e93fd1..00000000000 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2022 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.networking.eth2.rpc.core; - -import java.time.Duration; -import tech.pegasys.teku.networking.p2p.rpc.StreamTimeoutException; - -/** - * This class holds constants related to handling rpc request timeouts. See: - * https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#configuration - */ -public abstract class RpcTimeouts { - - // The maximum time to wait for first byte of request response (time-to-first-byte). - static final Duration TTFB_TIMEOUT = Duration.ofSeconds(5); - // The maximum time for complete response transfer. - public static final Duration RESP_TIMEOUT = Duration.ofSeconds(10); - - public static class RpcTimeoutException extends StreamTimeoutException { - - public RpcTimeoutException(final String message, final Duration timeout) { - super(generateMessage(message, timeout)); - } - - private static String generateMessage(final String message, final Duration timeout) { - return String.format("Rpc request timed out after %d sec: %s", timeout.toSeconds(), message); - } - } -} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java index fd3224e848d..1c25f683d43 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java @@ -29,7 +29,6 @@ import tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseEncoder; import tech.pegasys.teku.networking.eth2.rpc.core.encodings.RpcEncoding; import tech.pegasys.teku.networking.eth2.rpc.core.encodings.context.RpcContextCodec; -import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class SingleProtocolEth2RpcMethod< @@ -45,7 +44,6 @@ public class SingleProtocolEth2RpcMethod< private final LocalMessageHandler localMessageHandler; private final PeerLookup peerLookup; - private final NetworkingSpecConfig networkingConfig; public SingleProtocolEth2RpcMethod( final AsyncRunner asyncRunner, @@ -56,8 +54,7 @@ public SingleProtocolEth2RpcMethod( final boolean expectResponseToRequest, final RpcContextCodec contextCodec, final LocalMessageHandler localMessageHandler, - final PeerLookup peerLookup, - final NetworkingSpecConfig networkingConfig) { + final PeerLookup peerLookup) { super(encoding, requestType, expectResponseToRequest); this.asyncRunner = asyncRunner; this.contextCodec = contextCodec; @@ -66,7 +63,6 @@ public SingleProtocolEth2RpcMethod( this.protocolVersion = protocolVersion; this.localMessageHandler = localMessageHandler; this.peerLookup = peerLookup; - this.networkingConfig = networkingConfig; } @Override @@ -91,8 +87,7 @@ public Eth2IncomingRequestHandler createIncomingRequestHand createRequestDecoder(), asyncRunner, peerLookup, - localMessageHandler, - networkingConfig); + localMessageHandler); } @Override @@ -107,8 +102,7 @@ public Eth2OutgoingRequestHandler createOutgoingRequestHand createResponseDecoder(), expectResponseToRequest, request, - responseHandler, - networkingConfig); + responseHandler); } @Override diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java index ce83a72a55c..5dd08fe79f5 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java @@ -18,9 +18,9 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.READ_COMPLETE_TIMEOUT; +import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.RESPONSE_CHUNK_ARRIVAL_TIMEOUT; -import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -65,10 +65,6 @@ public class Eth2OutgoingRequestHandlerTest Eth2RpcResponseHandler.expectMultipleResponses(res -> responseListener.get().onResponse(res)); private final int maxChunks = 3; private final SafeFuture finishedProcessingFuture = responseHandler.getCompletedFuture(); - private final Duration ttbfTimeout = - Duration.ofSeconds(spec.getGenesisSpecConfig().getTtfbTimeout()); - private final Duration respTimeout = - Duration.ofSeconds(spec.getGenesisSpecConfig().getRespTimeout()); private RpcResponseEncoder responseEncoder; private List chunks; @@ -106,8 +102,7 @@ public void setup() { responseDecoder, method.shouldReceiveResponse(), request, - responseHandler, - spec.getNetworkingConfig()); + responseHandler); } @Override @@ -270,63 +265,30 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception { } @Test - public void disconnectsIfInitialBytesAreNotReceivedInTime() { + public void disconnectsIfFirstChunkIsNotReceivedInTime() { sendInitialPayload(); - verify(rpcStream).closeWriteStream(); - verify(rpcStream, never()).closeAbruptly(); - - // Run async tasks - timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis()); - timeoutRunner.executeDueActions(); - verify(rpcStream).closeAbruptly(); - } - - @Test - public void doesNotDisconnectIfInitialBytesAreReceivedInTime() throws Exception { - sendInitialPayload(); - verify(rpcStream).closeWriteStream(); - verify(rpcStream, never()).closeAbruptly(); - - // Deliver some bytes just in time - timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis() - 1); - timeoutRunner.executeDueActions(); - deliverInitialBytes(); - - // Go past the time the first bytes should have been received and check it doesn't timeout - timeProvider.advanceTimeByMillis(10); - timeoutRunner.executeDueActions(); - verify(rpcStream, never()).closeAbruptly(); - } - - @Test - public void disconnectsIfFirstChunkIsNotReceivedInTime() throws Exception { - sendInitialPayload(); - - deliverInitialBytes(); // Run timeouts - timeProvider.advanceTimeByMillis(respTimeout.toMillis()); + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @Test - public void doNotDisconnectsIfFirstChunkReceivedInTime() throws Exception { + public void doNotDisconnectsIfFirstChunkReceivedInTime() { sendInitialPayload(); - // First byte is received just in time - timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis() - 1); deliverChunk(0); // Go past the time the first chunk would have timed out but not enough to trigger timeout on // the second chunk and ensure the timeout never fires. - timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); timeoutRunner.executeDueActions(); verify(rpcStream, never()).closeAbruptly(); } @Test - public void disconnectsIfSecondChunkNotReceivedInTime() throws Exception { + public void disconnectsIfSecondChunkNotReceivedInTime() { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -335,13 +297,13 @@ public void disconnectsIfSecondChunkNotReceivedInTime() throws Exception { assertThat(blocks.size()).isEqualTo(1); // Run timeouts - timeProvider.advanceTimeByMillis(respTimeout.toMillis()); + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @Test - public void abortsWhenNoReadComplete() throws Exception { + public void abortsWhenNoReadComplete() { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -352,7 +314,7 @@ public void abortsWhenNoReadComplete() throws Exception { asyncRequestRunner.executeQueuedActions(); // Run timeouts - timeProvider.advanceTimeByMillis(respTimeout.toMillis()); + timeProvider.advanceTimeByMillis(READ_COMPLETE_TIMEOUT.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @@ -373,7 +335,7 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() { } @Test - public void doNotDisconnectsIfSecondChunkReceivedInTime() throws Exception { + public void doNotDisconnectsIfSecondChunkReceivedInTime() { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -382,14 +344,14 @@ public void doNotDisconnectsIfSecondChunkReceivedInTime() throws Exception { assertThat(blocks.size()).isEqualTo(1); // Second chunk is received just in time - timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); timeoutRunner.executeDueActions(); deliverChunk(1); asyncRequestRunner.executeQueuedActions(); // Go past the time the second chunk would have timed out but not enough to trigger timeout on // the third chunk and ensure the timeout never fires. - timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); timeoutRunner.executeDueActions(); verify(rpcStream, never()).closeAbruptly(); assertThat(blocks.size()).isEqualTo(2); @@ -439,11 +401,6 @@ private void sendInitialPayload() { reqHandler.handleInitialPayloadSent(rpcStream); } - private void deliverInitialBytes() throws IOException { - final Bytes firstByte = chunks.get(0).slice(0, 1); - deliverBytes(firstByte); - } - private Bytes chunkBytes(final int chunk) { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(chunk); return responseEncoder.encodeSuccessfulResponse(block); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java index 30e54675f05..9f526c9c21b 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java @@ -78,8 +78,7 @@ public class RpcDecoderTestBase { false, contextEncoder, mock(LocalMessageHandler.class), - peerLookup, - spec.getNetworkingConfig()); + peerLookup); protected List> testByteBufSlices(final Bytes... bytes) { List> ret = Utils.generateTestSlices(bytes); diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java index 5b809f77d14..733f92be5d2 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_OPEN_STREAMS_RATE_LIMIT; import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT; +import static tech.pegasys.teku.spec.constants.NetworkConstants.MAX_CONCURRENT_REQUESTS; import com.google.common.base.Preconditions; import identify.pb.IdentifyOuterClass; @@ -152,7 +153,9 @@ public P2PNetwork build() { } protected List> createRpcHandlers() { - return rpcMethods.stream().map(m -> new RpcHandler<>(asyncRunner, m)).toList(); + return rpcMethods.stream() + .map(m -> new RpcHandler<>(asyncRunner, m, MAX_CONCURRENT_REQUESTS)) + .toList(); } protected LibP2PGossipNetwork createGossipNetwork() { diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java index 9de93c7765b..f5f85af3c22 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java @@ -38,6 +38,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.SafeFuture.Interruptor; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId; import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler.Controller; @@ -56,17 +57,21 @@ public class RpcHandler< TRequest, TRespHandler extends RpcResponseHandler> implements ProtocolBinding> { - private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final Logger LOG = LogManager.getLogger(); - private final RpcMethod rpcMethod; + private static final Duration STREAM_INITIALIZE_TIMEOUT = Duration.ofSeconds(5); + private final AsyncRunner asyncRunner; + private final RpcMethod rpcMethod; + private final ThrottlingTaskQueue concurrentRequestsQueue; public RpcHandler( final AsyncRunner asyncRunner, - final RpcMethod rpcMethod) { + final RpcMethod rpcMethod, + final int maxConcurrentRequests) { this.asyncRunner = asyncRunner; this.rpcMethod = rpcMethod; + concurrentRequestsQueue = ThrottlingTaskQueue.create(maxConcurrentRequests); } public RpcMethod getRpcMethod() { @@ -75,6 +80,12 @@ public RpcMethod getRpcMethod() { public SafeFuture> sendRequest( final Connection connection, final TRequest request, final TRespHandler responseHandler) { + return concurrentRequestsQueue.queueTask( + () -> sendRequestInternal(connection, request, responseHandler)); + } + + public SafeFuture> sendRequestInternal( + final Connection connection, final TRequest request, final TRespHandler responseHandler) { final Bytes initialPayload; try { @@ -83,11 +94,11 @@ public SafeFuture> sendRequest( return SafeFuture.failedFuture(e); } - Interruptor closeInterruptor = + final Interruptor closeInterruptor = SafeFuture.createInterruptor(connection.closeFuture(), PeerDisconnectedException::new); - Interruptor timeoutInterruptor = + final Interruptor timeoutInterruptor = SafeFuture.createInterruptor( - asyncRunner.getDelayedFuture(TIMEOUT), + asyncRunner.getDelayedFuture(STREAM_INITIALIZE_TIMEOUT), () -> new StreamTimeoutException( "Timed out waiting to initialize stream for protocol(s): " diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java index 844e144ffc0..4537560323e 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java @@ -30,6 +30,7 @@ import io.libp2p.core.mux.StreamMuxer.Session; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import kotlin.Unit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -52,8 +53,9 @@ public class RpcHandlerTest { StubAsyncRunner asyncRunner = new StubAsyncRunner(); RpcMethod> rpcMethod = mock(RpcMethod.class); + int maxConcurrentRequests = 2; RpcHandler> rpcHandler = - new RpcHandler<>(asyncRunner, rpcMethod); + new RpcHandler<>(asyncRunner, rpcMethod, maxConcurrentRequests); Connection connection = mock(Connection.class); Session session = mock(Session.class); @@ -247,6 +249,39 @@ void sendRequest_interruptBeforeInitialPayloadWritten( verify(stream).close(); } + @Test + @SuppressWarnings("FutureReturnValueIgnored") + void requestIsThrottledIfQueueIsFull() { + // fill the queue + IntStream.range(0, maxConcurrentRequests) + .forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler)); + + final StreamPromise> streamPromise1 = + new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>()); + when(session.createStream((ProtocolBinding>) any())) + .thenReturn(streamPromise1); + final Stream stream1 = mock(Stream.class); + streamPromise1.getStream().complete(stream1); + streamPromise1.getController().complete(controller); + final CompletableFuture protocolIdFuture1 = new CompletableFuture<>(); + when(stream1.getProtocol()).thenReturn(protocolIdFuture1); + protocolIdFuture1.complete("test"); + + final SafeFuture> throttledResult = + rpcHandler.sendRequest(connection, request, responseHandler); + + assertThat(throttledResult).isNotDone(); + + // empty the queue + streamPromise.getStream().complete(stream); + streamPromise.getController().complete(controller); + stream.getProtocol().complete("test"); + writeFuture.complete(null); + + // throttled request should have completed now + assertThat(throttledResult).isCompleted(); + } + @SuppressWarnings("UnnecessaryAsync") private Class executeInterrupts( final boolean closeStream, final boolean exceedTimeout) {