Skip to content

Commit

Permalink
Adds some statistical metrics for BlobSidecarPool (Consensys#7872)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Jan 12, 2024
1 parent 44161eb commit cb6a566
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
}

@Override
public void prepareForBlockImport(final BlobSidecar blobSidecar) {
public void prepareForBlockImport(final BlobSidecar blobSidecar, final RemoteOrigin origin) {
// NOOP
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
}

@Override
public void prepareForBlockImport(final BlobSidecar blobSidecar) {}
public void prepareForBlockImport(
final BlobSidecar blobSidecar, final RemoteOrigin remoteOrigin) {}

@Override
public void subscribeToReceivedBlobSidecar(
Expand All @@ -61,7 +62,7 @@ public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmed
SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
BlobSidecar blobSidecar, Optional<UInt64> arrivalTimestamp);

void prepareForBlockImport(BlobSidecar blobSidecar);
void prepareForBlockImport(BlobSidecar blobSidecar, RemoteOrigin origin);

void subscribeToReceivedBlobSidecar(ReceivedBlobSidecarListener receivedBlobSidecarListener);

Expand All @@ -75,4 +76,9 @@ BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately(
interface ReceivedBlobSidecarListener {
void onBlobSidecarReceived(BlobSidecar blobSidecar);
}

enum RemoteOrigin {
RPC,
GOSSIP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
futureBlobSidecars.add(blobSidecar);
break;
case ACCEPT:
prepareForBlockImport(blobSidecar);
prepareForBlockImport(blobSidecar, RemoteOrigin.GOSSIP);
break;
}
});
Expand All @@ -102,8 +102,8 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
}

@Override
public void prepareForBlockImport(final BlobSidecar blobSidecar) {
blobSidecarPool.onNewBlobSidecar(blobSidecar);
public void prepareForBlockImport(final BlobSidecar blobSidecar, final RemoteOrigin origin) {
blobSidecarPool.onNewBlobSidecar(blobSidecar, origin);
receivedBlobSidecarSubscribers.forEach(s -> s.onBlobSidecarReceived(blobSidecar));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;

public interface BlobSidecarPool extends SlotEventsChannel {

Expand All @@ -32,10 +33,12 @@ public interface BlobSidecarPool extends SlotEventsChannel {
public void onSlot(final UInt64 slot) {}

@Override
public void onNewBlobSidecar(final BlobSidecar blobSidecar) {}
public void onNewBlobSidecar(
final BlobSidecar blobSidecar, final RemoteOrigin remoteOrigin) {}

@Override
public void onNewBlock(final SignedBeaconBlock block) {}
public void onNewBlock(
final SignedBeaconBlock block, final Optional<RemoteOrigin> remoteOrigin) {}

@Override
public void onCompletedBlockAndBlobSidecars(
Expand Down Expand Up @@ -101,9 +104,9 @@ public void subscribeRequiredBlockRootDropped(
public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubscriber) {}
};

void onNewBlobSidecar(BlobSidecar blobSidecar);
void onNewBlobSidecar(BlobSidecar blobSidecar, RemoteOrigin remoteOrigin);

void onNewBlock(SignedBeaconBlock block);
void onNewBlock(SignedBeaconBlock block, Optional<RemoteOrigin> remoteOrigin);

void onCompletedBlockAndBlobSidecars(SignedBeaconBlock block, List<BlobSidecar> blobSidecars);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@
package tech.pegasys.teku.statetransition.block;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult;

public interface BlockImportChannel extends ChannelInterface {

SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(
SignedBeaconBlock block, BroadcastValidationLevel broadcastValidationLevel);
SignedBeaconBlock block,
BroadcastValidationLevel broadcastValidationLevel,
Optional<RemoteOrigin> origin);

default SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(
SignedBeaconBlock block, BroadcastValidationLevel broadcastValidationLevel) {
return importBlock(block, broadcastValidationLevel, Optional.empty());
}

record BlockImportAndBroadcastValidationResults(
SafeFuture<BlockImportResult> blockImportResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.util.FutureItems;
import tech.pegasys.teku.statetransition.util.PendingPool;
Expand Down Expand Up @@ -107,14 +108,16 @@ protected SafeFuture<?> doStop() {

@Override
public SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(
final SignedBeaconBlock block, final BroadcastValidationLevel broadcastValidationLevel) {
final SignedBeaconBlock block,
final BroadcastValidationLevel broadcastValidationLevel,
final Optional<RemoteOrigin> origin) {
LOG.trace("Preparing to import block: {}", block::toLogString);

final BlockBroadcastValidator blockBroadcastValidator =
blockValidator.initiateBroadcastValidation(block, broadcastValidationLevel);

final SafeFuture<BlockImportResult> importResult =
doImportBlock(block, Optional.empty(), blockBroadcastValidator);
doImportBlock(block, Optional.empty(), blockBroadcastValidator, origin);

// we want to intercept any early import exceptions happening before the consensus validation is
// completed
Expand Down Expand Up @@ -155,7 +158,10 @@ public SafeFuture<InternalValidationResult> validateAndImportBlock(
result -> {
switch (result.code()) {
case ACCEPT, SAVE_FOR_FUTURE -> doImportBlock(
block, blockImportPerformance, BlockBroadcastValidator.NOOP)
block,
blockImportPerformance,
BlockBroadcastValidator.NOOP,
Optional.of(RemoteOrigin.GOSSIP))
.finish(err -> LOG.error("Failed to process received block.", err));

// block failed gossip validation, let's drop it from the pool, so it won't be served
Expand Down Expand Up @@ -212,19 +218,21 @@ public void onBlockValidated(SignedBeaconBlock block) {
}

private void importBlockIgnoringResult(final SignedBeaconBlock block) {
doImportBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP)
// we don't care about origin here because flow calls this function for retries only
doImportBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP, Optional.empty())
.ifExceptionGetsHereRaiseABug();
}

private SafeFuture<BlockImportResult> doImportBlock(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance,
final BlockBroadcastValidator blockBroadcastValidator) {
final BlockBroadcastValidator blockBroadcastValidator,
final Optional<RemoteOrigin> origin) {
return handleInvalidBlock(block)
.or(() -> handleKnownBlock(block))
.orElseGet(
() ->
handleBlockImport(block, blockImportPerformance, blockBroadcastValidator)
handleBlockImport(block, blockImportPerformance, blockBroadcastValidator, origin)
.thenPeek(
result -> lateBlockImportCheck(blockImportPerformance, block, result)))
.thenPeek(
Expand Down Expand Up @@ -271,10 +279,11 @@ private Optional<SafeFuture<BlockImportResult>> handleKnownBlock(final SignedBea
private SafeFuture<BlockImportResult> handleBlockImport(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance,
final BlockBroadcastValidator blockBroadcastValidator) {
final BlockBroadcastValidator blockBroadcastValidator,
final Optional<RemoteOrigin> origin) {

onBlockValidated(block);
blobSidecarPool.onNewBlock(block);
blobSidecarPool.onNewBlock(block, origin);

return blockImporter
.importBlock(block, blockImportPerformance, blockBroadcastValidator)
Expand Down
Loading

0 comments on commit cb6a566

Please sign in to comment.