diff --git a/CHANGELOG.md b/CHANGELOG.md index 14e06e650cf..3468c1fdf1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,14 @@ the [releases page](https://github.com/Consensys/teku/releases). ### Breaking Changes +- The CLI options `--beacon-events-block-notify-when-validated-enabled` and + `--beacon-events-block-notify-when-imported-enabled` have been removed. This change was made due + to redundancy, as the functionality of these options is now covered by the new `block_gossip` and + the existing `block` SSE events. + ### Additions and Improvements - Improved compatibility with `/eth/v3/validator/blocks/{slot}` experimental beacon API for block production. It can now respond with blinded and unblinded content based on the block production flow. It also supports the `builder_boost_factor` parameter. +- Add `block_gossip` SSE event as per https://github.com/ethereum/beacon-APIs/pull/405 + ### Bug Fixes diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index b768eb2f8a7..5add1264328 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -50,6 +50,7 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; @@ -158,9 +159,7 @@ public static SyncingNodeManager create( blockValidator, timeProvider, EVENT_LOG, - Optional.empty(), - true, - false); + Optional.empty()); eventChannels .subscribe(SlotEventsChannel.class, blockManager) @@ -207,8 +206,18 @@ public static SyncingNodeManager create( recentBlocksFetcher.subscribeBlockFetched( block -> blockManager.importBlock(block, BroadcastValidationLevel.NOT_REQUIRED)); blockManager.subscribeToReceivedBlocks( - (block, executionOptimistic) -> - recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot())); + new ReceivedBlockListener() { + @Override + public void onBlockValidated(final SignedBeaconBlock block) { + // NOOP + } + + @Override + public void onBlockImported( + final SignedBeaconBlock block, final boolean executionOptimistic) { + recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot()); + } + }); recentBlocksFetcher.start().join(); blockManager.start().join(); diff --git a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/BeaconRestApiConfig.java b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/BeaconRestApiConfig.java index 4c0284b6dff..cc4854946b4 100644 --- a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/BeaconRestApiConfig.java +++ b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/BeaconRestApiConfig.java @@ -25,8 +25,6 @@ import tech.pegasys.teku.infrastructure.io.PortAvailability; public class BeaconRestApiConfig { - public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED = false; - public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED = true; private static final Logger LOG = LogManager.getLogger(); public static final int DEFAULT_REST_API_PORT = 5051; @@ -52,8 +50,6 @@ public class BeaconRestApiConfig { private final int maxUrlLength; private final int maxPendingEvents; private final Optional validatorThreads; - private final boolean beaconEventsBlockNotifyWhenValidatedEnabled; - private final boolean beaconEventsBlockNotifyWhenImportedEnabled; private BeaconRestApiConfig( final int restApiPort, @@ -67,9 +63,7 @@ private BeaconRestApiConfig( final int maxUrlLength, final int maxPendingEvents, final Optional validatorThreads, - final boolean beaconLivenessTrackingEnabled, - final boolean beaconEventsBlockNotifyWhenValidatedEnabled, - final boolean beaconEventsBlockNotifyWhenImportedEnabled) { + final boolean beaconLivenessTrackingEnabled) { this.restApiPort = restApiPort; this.restApiDocsEnabled = restApiDocsEnabled; this.restApiEnabled = restApiEnabled; @@ -82,8 +76,6 @@ private BeaconRestApiConfig( this.maxPendingEvents = maxPendingEvents; this.validatorThreads = validatorThreads; this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled; - this.beaconEventsBlockNotifyWhenValidatedEnabled = beaconEventsBlockNotifyWhenValidatedEnabled; - this.beaconEventsBlockNotifyWhenImportedEnabled = beaconEventsBlockNotifyWhenImportedEnabled; } public int getRestApiPort() { @@ -106,14 +98,6 @@ public boolean isBeaconLivenessTrackingEnabled() { return beaconLivenessTrackingEnabled; } - public boolean isBeaconEventsBlockNotifyWhenValidatedEnabled() { - return beaconEventsBlockNotifyWhenValidatedEnabled; - } - - public boolean isBeaconEventsBlockNotifyWhenImportedEnabled() { - return beaconEventsBlockNotifyWhenImportedEnabled; - } - public String getRestApiInterface() { return restApiInterface; } @@ -171,10 +155,6 @@ public static final class BeaconRestApiConfigBuilder { private int maxUrlLength = DEFAULT_MAX_URL_LENGTH; private Optional validatorThreads = Optional.empty(); private Eth1Address eth1DepositContractAddress; - private boolean beaconEventsBlockNotifyWhenValidatedEnabled = - DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED; - private boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled = - DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED; private BeaconRestApiConfigBuilder() {} @@ -281,28 +261,12 @@ public BeaconRestApiConfig build() { maxUrlLength, maxPendingEvents, validatorThreads, - beaconLivenessTrackingEnabled, - beaconEventsBlockNotifyWhenValidatedEnabled, - defaultBeaconEventsBlockNotifyWhenImportedEnabled); + beaconLivenessTrackingEnabled); } public BeaconRestApiConfigBuilder maxUrlLength(final int maxUrlLength) { this.maxUrlLength = maxUrlLength; return this; } - - public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenValidatedEnabled( - final boolean beaconEventsBlockNotifyWhenValidatedEnabled) { - this.beaconEventsBlockNotifyWhenValidatedEnabled = - beaconEventsBlockNotifyWhenValidatedEnabled; - return this; - } - - public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenImportedEnabled( - final boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled) { - this.defaultBeaconEventsBlockNotifyWhenImportedEnabled = - defaultBeaconEventsBlockNotifyWhenImportedEnabled; - return this; - } } } diff --git a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/BlockGossipEvent.java b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/BlockGossipEvent.java new file mode 100644 index 00000000000..30ee32f4d85 --- /dev/null +++ b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/BlockGossipEvent.java @@ -0,0 +1,39 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beaconrestapi.handlers.v1.events; + +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE; +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE; + +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; + +public class BlockGossipEvent extends Event { + + private static final SerializableTypeDefinition + BLOCK_GOSSIP_EVENT_TYPE = + SerializableTypeDefinition.object(BlockGossipEvent.BlockData.class) + .name("BlockGossipEvent") + .withField("slot", UINT64_TYPE, BlockGossipEvent.BlockData::slot) + .withField("block", BYTES32_TYPE, BlockGossipEvent.BlockData::block) + .build(); + + BlockGossipEvent(final SignedBeaconBlock block) { + super(BLOCK_GOSSIP_EVENT_TYPE, new BlockData(block.getSlot(), block.getRoot())); + } + + public record BlockData(UInt64 slot, Bytes32 block) {} +} diff --git a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManager.java b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManager.java index 286117cb4c3..267812c50f8 100644 --- a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManager.java +++ b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManager.java @@ -42,6 +42,7 @@ import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; @@ -87,7 +88,19 @@ public EventSubscriptionManager( eventChannels.subscribe(ChainHeadChannel.class, this); eventChannels.subscribe(FinalizedCheckpointChannel.class, this); syncDataProvider.subscribeToSyncStateChanges(this::onSyncStateChange); - nodeDataProvider.subscribeToReceivedBlocks(this::onNewBlock); + nodeDataProvider.subscribeToReceivedBlocks( + new ReceivedBlockListener() { + @Override + public void onBlockValidated(final SignedBeaconBlock block) { + onNewBlockGossip(block); + } + + @Override + public void onBlockImported( + final SignedBeaconBlock block, final boolean executionOptimistic) { + onNewBlock(block, executionOptimistic); + } + }); nodeDataProvider.subscribeToReceivedBlobSidecar(this::onNewBlobSidecar); nodeDataProvider.subscribeToAttesterSlashing(this::onNewAttesterSlashing); nodeDataProvider.subscribeToProposerSlashing(this::onNewProposerSlashing); @@ -195,6 +208,11 @@ protected void onNewBlock(final SignedBeaconBlock block, final boolean execution notifySubscribersOfEvent(EventType.block, blockEvent); } + protected void onNewBlockGossip(final SignedBeaconBlock block) { + final BlockGossipEvent blockGossipEvent = new BlockGossipEvent(block); + notifySubscribersOfEvent(EventType.block_gossip, blockGossipEvent); + } + protected void onNewBlobSidecar(final BlobSidecar blobSidecar) { final BlobSidecarEvent blobSidecarEvent = BlobSidecarEvent.create(spec, blobSidecar); notifySubscribersOfEvent(EventType.blob_sidecar, blobSidecarEvent); diff --git a/data/beaconrestapi/src/test/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManagerTest.java b/data/beaconrestapi/src/test/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManagerTest.java index d662e8c4121..324656e3219 100644 --- a/data/beaconrestapi/src/test/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManagerTest.java +++ b/data/beaconrestapi/src/test/java/tech/pegasys/teku/beaconrestapi/handlers/v1/events/EventSubscriptionManagerTest.java @@ -258,6 +258,15 @@ void shouldPropagateBlock() throws IOException { checkEvent("block", new BlockEvent(sampleBlock.asInternalSignedBeaconBlock(spec), false)); } + @Test + void shouldPropagateBlockGossip() throws IOException { + when(req.getQueryString()).thenReturn("&topics=block_gossip"); + manager.registerClient(client1); + + triggerBlockGossipEvent(); + checkEvent("block_gossip", new BlockGossipEvent(sampleBlock.asInternalSignedBeaconBlock(spec))); + } + @Test void shouldPropagateBlobSidecar() throws IOException { when(req.getQueryString()).thenReturn("&topics=blob_sidecar"); @@ -444,6 +453,11 @@ private void triggerBlockEvent() { asyncRunner.executeQueuedActions(); } + private void triggerBlockGossipEvent() { + manager.onNewBlockGossip(sampleBlock.asInternalSignedBeaconBlock(spec)); + asyncRunner.executeQueuedActions(); + } + private void triggerBlobSidecarEvent() { manager.onNewBlobSidecar(sampleBlobSidecar); asyncRunner.executeQueuedActions(); diff --git a/data/provider/src/main/java/tech/pegasys/teku/api/NodeDataProvider.java b/data/provider/src/main/java/tech/pegasys/teku/api/NodeDataProvider.java index 6e52983101a..e3593a04687 100644 --- a/data/provider/src/main/java/tech/pegasys/teku/api/NodeDataProvider.java +++ b/data/provider/src/main/java/tech/pegasys/teku/api/NodeDataProvider.java @@ -27,7 +27,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener; -import tech.pegasys.teku.spec.datastructures.blocks.ImportedBlockListener; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.operations.Attestation; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; @@ -168,7 +168,7 @@ private SafeFuture> addBlsToExecutionChange( }); } - public void subscribeToReceivedBlocks(ImportedBlockListener listener) { + public void subscribeToReceivedBlocks(ReceivedBlockListener listener) { blockManager.subscribeToReceivedBlocks(listener); } diff --git a/data/serializer/src/main/java/tech/pegasys/teku/api/response/v1/EventType.java b/data/serializer/src/main/java/tech/pegasys/teku/api/response/v1/EventType.java index 5a64514a81f..d32ffbc279d 100644 --- a/data/serializer/src/main/java/tech/pegasys/teku/api/response/v1/EventType.java +++ b/data/serializer/src/main/java/tech/pegasys/teku/api/response/v1/EventType.java @@ -29,7 +29,8 @@ public enum EventType { blob_sidecar, attester_slashing, proposer_slashing, - payload_attributes; + payload_attributes, + block_gossip; public static List getTopics(List topics) { return topics.stream().map(EventType::valueOf).toList(); diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ImportedBlockListener.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ReceivedBlockListener.java similarity index 88% rename from ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ImportedBlockListener.java rename to ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ReceivedBlockListener.java index aaa22ab8cec..d8e4c978cbe 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ImportedBlockListener.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blocks/ReceivedBlockListener.java @@ -13,7 +13,8 @@ package tech.pegasys.teku.spec.datastructures.blocks; -@FunctionalInterface -public interface ImportedBlockListener { +public interface ReceivedBlockListener { + void onBlockValidated(SignedBeaconBlock block); + void onBlockImported(SignedBeaconBlock block, boolean executionOptimistic); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java index 81b4f67d54e..d8137b9076e 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java @@ -214,9 +214,6 @@ public void onBlockImported(final SignedBeaconBlock block) { }); } - @Override - public void onBlockValidated(SignedBeaconBlock block) {} - public SafeFuture onAttestation( final ValidatableAttestation attestation) { if (pendingAttestations.contains(attestation)) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportNotifications.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportNotifications.java index f6ca05ef4c3..7cf56aae822 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportNotifications.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportNotifications.java @@ -16,8 +16,7 @@ import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +@FunctionalInterface public interface BlockImportNotifications extends VoidReturningChannelInterface { void onBlockImported(SignedBeaconBlock block); - - void onBlockValidated(SignedBeaconBlock block); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java index abe575aebeb..8b0eea6bb87 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java @@ -26,7 +26,7 @@ import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.service.serviceutils.Service; -import tech.pegasys.teku.spec.datastructures.blocks.ImportedBlockListener; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadSummary; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; @@ -57,7 +57,7 @@ public class BlockManager extends Service // and will not require any further retry. Descendants of these blocks will be considered invalid // as well. private final Map invalidBlockRoots; - private final Subscribers receivedBlockSubscribers = + private final Subscribers receivedBlockSubscribers = Subscribers.create(true); private final Subscribers failedPayloadExecutionSubscribers = Subscribers.create(true); @@ -66,8 +66,6 @@ public class BlockManager extends Service Subscribers.create(true); private final Optional blockImportMetrics; - private final boolean isNotifyWhenImported; - private final boolean isNotifyWhenValidated; public BlockManager( final RecentChainData recentChainData, @@ -79,9 +77,7 @@ public BlockManager( final BlockValidator blockValidator, final TimeProvider timeProvider, final EventLogger eventLogger, - final Optional blockImportMetrics, - final boolean isNotifyWhenImported, - final boolean isNotifyWhenValidated) { + final Optional blockImportMetrics) { this.recentChainData = recentChainData; this.blockImporter = blockImporter; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; @@ -92,8 +88,6 @@ public BlockManager( this.timeProvider = timeProvider; this.eventLogger = eventLogger; this.blockImportMetrics = blockImportMetrics; - this.isNotifyWhenImported = isNotifyWhenImported; - this.isNotifyWhenValidated = isNotifyWhenValidated; } @Override @@ -181,14 +175,17 @@ public void onSlot(final UInt64 slot) { futureBlocks.prune(slot).forEach(this::importBlockIgnoringResult); } - public void subscribeToReceivedBlocks(ImportedBlockListener importedBlockListener) { - receivedBlockSubscribers.subscribe(importedBlockListener); + public void subscribeToReceivedBlocks(ReceivedBlockListener receivedBlockListener) { + receivedBlockSubscribers.subscribe(receivedBlockListener); } - private void notifyReceivedBlockSubscribers( - final SignedBeaconBlock signedBeaconBlock, final boolean executionOptimistic) { - receivedBlockSubscribers.forEach( - s -> s.onBlockImported(signedBeaconBlock, executionOptimistic)); + private void notifySubscribersThatBlockIsValidated(final SignedBeaconBlock block) { + receivedBlockSubscribers.forEach(s -> s.onBlockValidated(block)); + } + + private void notifySubscribersThatBlockIsImported( + final SignedBeaconBlock block, final boolean executionOptimistic) { + receivedBlockSubscribers.forEach(s -> s.onBlockImported(block, executionOptimistic)); } public void subscribeFailedPayloadExecution(final FailedPayloadExecutionSubscriber subscriber) { @@ -210,13 +207,6 @@ public void onBlockImported(final SignedBeaconBlock block) { children.forEach(this::importBlockIgnoringResult); } - @Override - public void onBlockValidated(SignedBeaconBlock block) { - if (isNotifyWhenValidated) { - notifyReceivedBlockSubscribers(block, recentChainData.isChainHeadOptimistic()); - } - } - private void importBlockIgnoringResult(final SignedBeaconBlock block) { // we don't care about origin here because flow calls this function for retries only doImportBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP, Optional.empty()) @@ -237,8 +227,8 @@ private SafeFuture doImportBlock( result -> lateBlockImportCheck(blockImportPerformance, block, result))) .thenPeek( result -> { - if (result.isSuccessful() && isNotifyWhenImported) { - notifyReceivedBlockSubscribers(block, result.isImportedOptimistically()); + if (result.isSuccessful()) { + notifySubscribersThatBlockIsImported(block, result.isImportedOptimistically()); } }); } @@ -282,7 +272,7 @@ private SafeFuture handleBlockImport( final BlockBroadcastValidator blockBroadcastValidator, final Optional origin) { - onBlockValidated(block); + notifySubscribersThatBlockIsValidated(block); blockBlobSidecarsTrackersPool.onNewBlock(block, origin); return blockImporter diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java index e35236ba518..95af6001aca 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java @@ -71,7 +71,7 @@ import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; -import tech.pegasys.teku.spec.datastructures.blocks.ImportedBlockListener; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; @@ -200,9 +200,7 @@ private void setupWithSpec(final Spec spec) { blockValidator, timeProvider, eventLogger, - Optional.of(mock(BlockImportMetrics.class)), - true, - false); + Optional.of(mock(BlockImportMetrics.class))); forwardBlockImportedNotificationsTo(blockManager); localChain .chainUpdater() @@ -244,7 +242,7 @@ public void shouldImport() { @Test public void shouldNotifySubscribersOnImport() { - final ImportedBlockListener subscriber = mock(ImportedBlockListener.class); + final ReceivedBlockListener subscriber = mock(ReceivedBlockListener.class); final RecentChainData localRecentChainData = mock(RecentChainData.class); blockManager = setupBlockManagerWithMockRecentChainData(localRecentChainData, false); @@ -255,13 +253,14 @@ public void shouldNotifySubscribersOnImport() { incrementSlot(); safeJoinBlockImport(nextBlock); + verify(subscriber).onBlockValidated(nextBlock); verify(subscriber).onBlockImported(nextBlock, false); verify(blockBlobSidecarsTrackersPool).removeAllForBlock(nextBlock.getRoot()); } @Test public void shouldNotifySubscribersOnKnownBlock() { - final ImportedBlockListener subscriber = mock(ImportedBlockListener.class); + final ReceivedBlockListener subscriber = mock(ReceivedBlockListener.class); final RecentChainData localRecentChainData = mock(RecentChainData.class); blockManager = setupBlockManagerWithMockRecentChainData(localRecentChainData, false); @@ -272,16 +271,18 @@ public void shouldNotifySubscribersOnKnownBlock() { incrementSlot(); safeJoinBlockImport(nextBlock); + verify(subscriber).onBlockValidated(nextBlock); verify(subscriber).onBlockImported(nextBlock, false); assertThatBlockImport(nextBlock) .isCompletedWithValue(BlockImportResult.knownBlock(nextBlock, false)); + verify(subscriber, times(2)).onBlockValidated(nextBlock); verify(subscriber, times(2)).onBlockImported(nextBlock, false); } @Test public void shouldNotifySubscribersOnKnownOptimisticBlock() { - final ImportedBlockListener subscriber = mock(ImportedBlockListener.class); + final ReceivedBlockListener subscriber = mock(ReceivedBlockListener.class); executionLayer.setPayloadStatus(PayloadStatus.SYNCING); final RecentChainData localRecentChainData = mock(RecentChainData.class); blockManager = setupBlockManagerWithMockRecentChainData(localRecentChainData, true); @@ -292,16 +293,18 @@ public void shouldNotifySubscribersOnKnownOptimisticBlock() { incrementSlot(); safeJoinBlockImport(nextBlock); + verify(subscriber).onBlockValidated(nextBlock); verify(subscriber).onBlockImported(nextBlock, true); assertThatBlockImport(nextBlock) .isCompletedWithValue(BlockImportResult.knownBlock(nextBlock, true)); + verify(subscriber, times(2)).onBlockValidated(nextBlock); verify(subscriber, times(2)).onBlockImported(nextBlock, true); } @Test public void shouldNotNotifySubscribersOnInvalidBlock() { - final ImportedBlockListener subscriber = mock(ImportedBlockListener.class); + final ReceivedBlockListener subscriber = mock(ReceivedBlockListener.class); blockManager.subscribeToReceivedBlocks(subscriber); final UInt64 nextSlot = GENESIS_SLOT.plus(UInt64.ONE); final SignedBeaconBlock validBlock = @@ -349,9 +352,7 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() { blockValidator, timeProvider, eventLogger, - Optional.empty(), - true, - false); + Optional.empty()); forwardBlockImportedNotificationsTo(blockManager); assertThat(blockManager.start()).isCompleted(); @@ -1189,9 +1190,7 @@ private BlockManager setupBlockManagerWithMockRecentChainData( blockValidator, timeProvider, eventLogger, - Optional.empty(), - true, - false); + Optional.empty()); } private SafeFutureAssert assertThatBlockImport(final SignedBeaconBlock block) { diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index b1b9bc288d6..30fccc2eebb 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -93,6 +93,7 @@ import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBodySchema; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.capella.BeaconBlockBodySchemaCapella; @@ -340,7 +341,18 @@ protected void startServices() { .thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult) .finish(err -> LOG.error("Failed to process recently fetched block.", err))); blockManager.subscribeToReceivedBlocks( - (block, __) -> recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot())); + new ReceivedBlockListener() { + @Override + public void onBlockValidated(final SignedBeaconBlock block) { + // NOOP + } + + @Override + public void onBlockImported( + final SignedBeaconBlock block, final boolean executionOptimistic) { + recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot()); + } + }); final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher = syncService.getRecentBlobSidecarsFetcher(); recentBlobSidecarsFetcher.subscribeBlobSidecarFetched( @@ -1191,9 +1203,7 @@ public void initBlockManager() { blockValidator, timeProvider, EVENT_LOG, - importMetrics, - beaconConfig.beaconRestApiConfig().isBeaconEventsBlockNotifyWhenImportedEnabled(), - beaconConfig.beaconRestApiConfig().isBeaconEventsBlockNotifyWhenValidatedEnabled()); + importMetrics); if (spec.isMilestoneSupported(SpecMilestone.BELLATRIX)) { final FailedExecutionPool failedExecutionPool = new FailedExecutionPool(blockManager, beaconAsyncRunner); diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconRestApiOptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconRestApiOptions.java index eed6c0af074..75a0cc8fc6e 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconRestApiOptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconRestApiOptions.java @@ -123,28 +123,6 @@ public void setMaxUrlLength(int maxUrlLength) { private Boolean beaconLivenessTrackingEnabled = BeaconRestApiConfig.DEFAULT_BEACON_LIVENESS_TRACKING_ENABLED; - @Option( - names = {"--beacon-events-block-notify-when-validated-enabled"}, - paramLabel = "", - showDefaultValue = Visibility.ALWAYS, - description = - "Block notification events will be sent once the block has been validated, prior to attempting to import the block. The optimistic flag will be determined based on if chain head is optimistic.", - arity = "0..1", - fallbackValue = "true") - private Boolean beaconEventsBlockNotifyWhenValidated = - BeaconRestApiConfig.DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED; - - @Option( - names = {"--beacon-events-block-notify-when-imported-enabled"}, - paramLabel = "", - showDefaultValue = Visibility.ALWAYS, - description = - "Block notification events will be sent once the block has been imported, at which point we can accurately set the optimistic flag.", - arity = "0..1", - fallbackValue = "true") - private Boolean beaconEventsBlockNotifyWhenImported = - BeaconRestApiConfig.DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED; - @Option( names = {"--Xrest-api-validator-threads"}, description = "Set the number of threads used to handle validator api requests", @@ -175,8 +153,6 @@ public void configure(final TekuConfiguration.Builder builder) { .restApiCorsAllowedOrigins(restApiCorsAllowedOrigins) .maxUrlLength(maxUrlLength) .beaconLivenessTrackingEnabled(beaconLivenessTrackingEnabled) - .beaconEventsBlockNotifyWhenValidatedEnabled(beaconEventsBlockNotifyWhenValidated) - .beaconEventsBlockNotifyWhenImportedEnabled(beaconEventsBlockNotifyWhenImported) .maxPendingEvents(maxPendingEvents) .validatorThreads(Optional.ofNullable(validatorThreads))); }