Skip to content

Commit

Permalink
Rename BlobSidecarPool to BlockBlobSidecarsTrackersPool (Consensy…
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Jan 12, 2024
1 parent cb6a566 commit 79c46de
Show file tree
Hide file tree
Showing 36 changed files with 430 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.validation.signatures.SignatureVerificationService;
Expand Down Expand Up @@ -68,7 +68,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final BlockImporter blockImporter;
private final BlobSidecarManager blobSidecarManager;
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final BlobSidecarPool blobSidecarPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -88,7 +88,7 @@ public DefaultSyncServiceFactory(
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final PendingPool<SignedBeaconBlock> pendingBlocks,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -106,7 +106,7 @@ public DefaultSyncServiceFactory(
this.blockImporter = blockImporter;
this.blobSidecarManager = blobSidecarManager;
this.pendingBlocks = pendingBlocks;
this.blobSidecarPool = blobSidecarPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand All @@ -125,10 +125,14 @@ public SyncService create(final EventChannels eventChannels) {

final RecentBlocksFetchService recentBlocksFetchService =
RecentBlocksFetchService.create(
asyncRunner, pendingBlocks, blobSidecarPool, forwardSyncService, fetchTaskFactory);
asyncRunner,
pendingBlocks,
blockBlobSidecarsTrackersPool,
forwardSyncService,
fetchTaskFactory);
final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher =
RecentBlobSidecarsFetcher.create(
spec, asyncRunner, blobSidecarPool, forwardSyncService, fetchTaskFactory);
spec, asyncRunner, blockBlobSidecarsTrackersPool, forwardSyncService, fetchTaskFactory);

final SyncStateTracker syncStateTracker = createSyncStateTracker(forwardSyncService);

Expand Down Expand Up @@ -186,7 +190,7 @@ protected ForwardSyncService createForwardSyncService() {
p2pNetwork,
blockImporter,
blobSidecarManager,
blobSidecarPool,
blockBlobSidecarsTrackersPool,
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
Expand All @@ -201,7 +205,7 @@ protected ForwardSyncService createForwardSyncService() {
recentChainData,
blockImporter,
blobSidecarManager,
blobSidecarPool,
blockBlobSidecarsTrackersPool,
syncConfig.getForwardSyncBatchSize(),
spec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;

public class BatchImporter {
private static final Logger LOG = LogManager.getLogger();

private final BlockImporter blockImporter;
private final BlobSidecarPool blobSidecarPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final AsyncRunner asyncRunner;

public BatchImporter(
final BlockImporter blockImporter,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final AsyncRunner asyncRunner) {
this.blockImporter = blockImporter;
this.blobSidecarPool = blobSidecarPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.asyncRunner = asyncRunner;
}

Expand Down Expand Up @@ -116,7 +116,7 @@ private SafeFuture<BlockImportResult> importBlockAndBlobSidecars(
blockRoot);
// Add blob sidecars to the pool in order for them to be available when the block is being
// imported
blobSidecarPool.onCompletedBlockAndBlobSidecars(block, blobSidecars);
blockBlobSidecarsTrackersPool.onCompletedBlockAndBlobSidecars(block, blobSidecars);
return importBlock(block, source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -72,7 +72,7 @@ public static MultipeerSyncService create(
final P2PNetwork<Eth2Peer> p2pNetwork,
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int batchSize,
final int maxPendingBatches,
final int maxBlocksPerMinute,
Expand All @@ -93,7 +93,7 @@ public static MultipeerSyncService create(
eventThread,
asyncRunner,
recentChainData,
new BatchImporter(blockImporter, blobSidecarPool, asyncRunner),
new BatchImporter(blockImporter, blockBlobSidecarsTrackersPool, asyncRunner),
new BatchFactory(
eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()),
batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand Down Expand Up @@ -72,7 +72,7 @@ public class PeerSync {
private final RecentChainData recentChainData;
private final BlockImporter blockImporter;
private final BlobSidecarManager blobSidecarManager;
private final BlobSidecarPool blobSidecarPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;

private final AsyncRunner asyncRunner;
private final Counter blockImportSuccessResult;
Expand All @@ -87,15 +87,15 @@ public PeerSync(
final RecentChainData recentChainData,
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int batchSize,
final MetricsSystem metricsSystem) {
this.spec = recentChainData.getSpec();
this.asyncRunner = asyncRunner;
this.recentChainData = recentChainData;
this.blockImporter = blockImporter;
this.blobSidecarManager = blobSidecarManager;
this.blobSidecarPool = blobSidecarPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
final LabelledMetric<Counter> blockImportCounter =
metricsSystem.createLabelledCounter(
TekuMetricCategory.BEACON,
Expand Down Expand Up @@ -343,7 +343,8 @@ private SafeFuture<Void> importBlock(
// Add blob sidecars to the pool in order for them to be available when the block is being
// imported
maybeBlobSidecars.ifPresent(
blobSidecars -> blobSidecarPool.onCompletedBlockAndBlobSidecars(block, blobSidecars));
blobSidecars ->
blockBlobSidecarsTrackersPool.onCompletedBlockAndBlobSidecars(block, blobSidecars));

return blockImporter
.importBlock(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand All @@ -32,7 +32,7 @@ public static ForwardSyncService create(
final RecentChainData recentChainData,
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int batchSize,
final Spec spec) {
final SyncManager syncManager =
Expand All @@ -42,7 +42,7 @@ public static ForwardSyncService create(
recentChainData,
blockImporter,
blobSidecarManager,
blobSidecarPool,
blockBlobSidecarsTrackersPool,
metricsSystem,
batchSize,
spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import tech.pegasys.teku.service.serviceutils.Service;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand Down Expand Up @@ -91,7 +91,7 @@ public static SyncManager create(
final RecentChainData recentChainData,
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final MetricsSystem metricsSystem,
final int batchSize,
final Spec spec) {
Expand All @@ -101,7 +101,7 @@ public static SyncManager create(
recentChainData,
blockImporter,
blobSidecarManager,
blobSidecarPool,
blockBlobSidecarsTrackersPool,
batchSize,
metricsSystem);
return new SyncManager(asyncRunner, network, recentChainData, peerSync, spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;

public class RecentBlobSidecarsFetchService
extends AbstractFetchService<BlobIdentifier, FetchBlobSidecarTask, BlobSidecar>
implements RecentBlobSidecarsFetcher {

private static final Logger LOG = LogManager.getLogger();

private final BlobSidecarPool blobSidecarPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final ForwardSync forwardSync;
private final FetchTaskFactory fetchTaskFactory;

Expand All @@ -43,26 +43,30 @@ public class RecentBlobSidecarsFetchService

RecentBlobSidecarsFetchService(
final AsyncRunner asyncRunner,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final int maxConcurrentRequests) {
super(asyncRunner, maxConcurrentRequests);
this.blobSidecarPool = blobSidecarPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.forwardSync = forwardSync;
this.fetchTaskFactory = fetchTaskFactory;
}

public static RecentBlobSidecarsFetchService create(
final AsyncRunner asyncRunner,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final Spec spec) {
final int maxConcurrentRequests =
RecentBlocksFetchService.MAX_CONCURRENT_REQUESTS * spec.getMaxBlobsPerBlock().orElse(1);
return new RecentBlobSidecarsFetchService(
asyncRunner, blobSidecarPool, forwardSync, fetchTaskFactory, maxConcurrentRequests);
asyncRunner,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
maxConcurrentRequests);
}

@Override
Expand All @@ -87,7 +91,7 @@ public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) {
// Forward sync already in progress, assume it will fetch any missing blob sidecars
return;
}
if (blobSidecarPool.containsBlobSidecar(blobIdentifier)) {
if (blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)) {
// We've already got this blob sidecar
return;
}
Expand All @@ -107,8 +111,9 @@ public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier)
}

private void setupSubscribers() {
blobSidecarPool.subscribeRequiredBlobSidecar(this::requestRecentBlobSidecar);
blobSidecarPool.subscribeRequiredBlobSidecarDropped(this::cancelRecentBlobSidecarRequest);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecar(this::requestRecentBlobSidecar);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped(
this::cancelRecentBlobSidecarRequest);
forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged);
}

Expand All @@ -118,7 +123,9 @@ private void onSyncStatusChanged(final boolean syncActive) {
}
// Ensure we are requesting the blob sidecars not already filled in by the sync
// We may have ignored these requested blob sidecars while the sync was in progress
blobSidecarPool.getAllRequiredBlobSidecars().forEach(this::requestRecentBlobSidecar);
blockBlobSidecarsTrackersPool
.getAllRequiredBlobSidecars()
.forEach(this::requestRecentBlobSidecar);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;

public interface RecentBlobSidecarsFetcher extends ServiceFacade {

Expand Down Expand Up @@ -55,14 +55,18 @@ public boolean isRunning() {
static RecentBlobSidecarsFetcher create(
final Spec spec,
final AsyncRunner asyncRunner,
final BlobSidecarPool blobSidecarPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSyncService forwardSyncService,
final FetchTaskFactory fetchTaskFactory) {
final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher;
if (spec.isMilestoneSupported(SpecMilestone.DENEB)) {
recentBlobSidecarsFetcher =
RecentBlobSidecarsFetchService.create(
asyncRunner, blobSidecarPool, forwardSyncService, fetchTaskFactory, spec);
asyncRunner,
blockBlobSidecarsTrackersPool,
forwardSyncService,
fetchTaskFactory,
spec);
} else {
recentBlobSidecarsFetcher = RecentBlobSidecarsFetcher.NOOP;
}
Expand Down
Loading

0 comments on commit 79c46de

Please sign in to comment.