Skip to content

Commit

Permalink
Add block_gossip SSE event (Consensys#7941)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Feb 2, 2024
1 parent 9ef2c1b commit e7c7ff5
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 121 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ the [releases page](https://github.com/Consensys/teku/releases).

### Breaking Changes

- The CLI options `--beacon-events-block-notify-when-validated-enabled` and
`--beacon-events-block-notify-when-imported-enabled` have been removed. This change was made due
to redundancy, as the functionality of these options is now covered by the new `block_gossip` and
the existing `block` SSE events.

### Additions and Improvements
- Improved compatibility with `/eth/v3/validator/blocks/{slot}` experimental beacon API for block production. It can now respond with blinded and unblinded content based on the block production flow. It also supports the `builder_boost_factor` parameter.

- Add `block_gossip` SSE event as per https://github.com/ethereum/beacon-APIs/pull/405

### Bug Fixes
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
Expand Down Expand Up @@ -158,9 +159,7 @@ public static SyncingNodeManager create(
blockValidator,
timeProvider,
EVENT_LOG,
Optional.empty(),
true,
false);
Optional.empty());

eventChannels
.subscribe(SlotEventsChannel.class, blockManager)
Expand Down Expand Up @@ -207,8 +206,18 @@ public static SyncingNodeManager create(
recentBlocksFetcher.subscribeBlockFetched(
block -> blockManager.importBlock(block, BroadcastValidationLevel.NOT_REQUIRED));
blockManager.subscribeToReceivedBlocks(
(block, executionOptimistic) ->
recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot()));
new ReceivedBlockListener() {
@Override
public void onBlockValidated(final SignedBeaconBlock block) {
// NOOP
}

@Override
public void onBlockImported(
final SignedBeaconBlock block, final boolean executionOptimistic) {
recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot());
}
});

recentBlocksFetcher.start().join();
blockManager.start().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import tech.pegasys.teku.infrastructure.io.PortAvailability;

public class BeaconRestApiConfig {
public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED = false;
public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED = true;
private static final Logger LOG = LogManager.getLogger();

public static final int DEFAULT_REST_API_PORT = 5051;
Expand All @@ -52,8 +50,6 @@ public class BeaconRestApiConfig {
private final int maxUrlLength;
private final int maxPendingEvents;
private final Optional<Integer> validatorThreads;
private final boolean beaconEventsBlockNotifyWhenValidatedEnabled;
private final boolean beaconEventsBlockNotifyWhenImportedEnabled;

private BeaconRestApiConfig(
final int restApiPort,
Expand All @@ -67,9 +63,7 @@ private BeaconRestApiConfig(
final int maxUrlLength,
final int maxPendingEvents,
final Optional<Integer> validatorThreads,
final boolean beaconLivenessTrackingEnabled,
final boolean beaconEventsBlockNotifyWhenValidatedEnabled,
final boolean beaconEventsBlockNotifyWhenImportedEnabled) {
final boolean beaconLivenessTrackingEnabled) {
this.restApiPort = restApiPort;
this.restApiDocsEnabled = restApiDocsEnabled;
this.restApiEnabled = restApiEnabled;
Expand All @@ -82,8 +76,6 @@ private BeaconRestApiConfig(
this.maxPendingEvents = maxPendingEvents;
this.validatorThreads = validatorThreads;
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
this.beaconEventsBlockNotifyWhenValidatedEnabled = beaconEventsBlockNotifyWhenValidatedEnabled;
this.beaconEventsBlockNotifyWhenImportedEnabled = beaconEventsBlockNotifyWhenImportedEnabled;
}

public int getRestApiPort() {
Expand All @@ -106,14 +98,6 @@ public boolean isBeaconLivenessTrackingEnabled() {
return beaconLivenessTrackingEnabled;
}

public boolean isBeaconEventsBlockNotifyWhenValidatedEnabled() {
return beaconEventsBlockNotifyWhenValidatedEnabled;
}

public boolean isBeaconEventsBlockNotifyWhenImportedEnabled() {
return beaconEventsBlockNotifyWhenImportedEnabled;
}

public String getRestApiInterface() {
return restApiInterface;
}
Expand Down Expand Up @@ -171,10 +155,6 @@ public static final class BeaconRestApiConfigBuilder {
private int maxUrlLength = DEFAULT_MAX_URL_LENGTH;
private Optional<Integer> validatorThreads = Optional.empty();
private Eth1Address eth1DepositContractAddress;
private boolean beaconEventsBlockNotifyWhenValidatedEnabled =
DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED;
private boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled =
DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED;

private BeaconRestApiConfigBuilder() {}

Expand Down Expand Up @@ -281,28 +261,12 @@ public BeaconRestApiConfig build() {
maxUrlLength,
maxPendingEvents,
validatorThreads,
beaconLivenessTrackingEnabled,
beaconEventsBlockNotifyWhenValidatedEnabled,
defaultBeaconEventsBlockNotifyWhenImportedEnabled);
beaconLivenessTrackingEnabled);
}

public BeaconRestApiConfigBuilder maxUrlLength(final int maxUrlLength) {
this.maxUrlLength = maxUrlLength;
return this;
}

public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenValidatedEnabled(
final boolean beaconEventsBlockNotifyWhenValidatedEnabled) {
this.beaconEventsBlockNotifyWhenValidatedEnabled =
beaconEventsBlockNotifyWhenValidatedEnabled;
return this;
}

public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenImportedEnabled(
final boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled) {
this.defaultBeaconEventsBlockNotifyWhenImportedEnabled =
defaultBeaconEventsBlockNotifyWhenImportedEnabled;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.handlers.v1.events;

import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class BlockGossipEvent extends Event<BlockGossipEvent.BlockData> {

private static final SerializableTypeDefinition<BlockGossipEvent.BlockData>
BLOCK_GOSSIP_EVENT_TYPE =
SerializableTypeDefinition.object(BlockGossipEvent.BlockData.class)
.name("BlockGossipEvent")
.withField("slot", UINT64_TYPE, BlockGossipEvent.BlockData::slot)
.withField("block", BYTES32_TYPE, BlockGossipEvent.BlockData::block)
.build();

BlockGossipEvent(final SignedBeaconBlock block) {
super(BLOCK_GOSSIP_EVENT_TYPE, new BlockData(block.getSlot(), block.getRoot()));
}

public record BlockData(UInt64 slot, Bytes32 block) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -87,7 +88,19 @@ public EventSubscriptionManager(
eventChannels.subscribe(ChainHeadChannel.class, this);
eventChannels.subscribe(FinalizedCheckpointChannel.class, this);
syncDataProvider.subscribeToSyncStateChanges(this::onSyncStateChange);
nodeDataProvider.subscribeToReceivedBlocks(this::onNewBlock);
nodeDataProvider.subscribeToReceivedBlocks(
new ReceivedBlockListener() {
@Override
public void onBlockValidated(final SignedBeaconBlock block) {
onNewBlockGossip(block);
}

@Override
public void onBlockImported(
final SignedBeaconBlock block, final boolean executionOptimistic) {
onNewBlock(block, executionOptimistic);
}
});
nodeDataProvider.subscribeToReceivedBlobSidecar(this::onNewBlobSidecar);
nodeDataProvider.subscribeToAttesterSlashing(this::onNewAttesterSlashing);
nodeDataProvider.subscribeToProposerSlashing(this::onNewProposerSlashing);
Expand Down Expand Up @@ -195,6 +208,11 @@ protected void onNewBlock(final SignedBeaconBlock block, final boolean execution
notifySubscribersOfEvent(EventType.block, blockEvent);
}

protected void onNewBlockGossip(final SignedBeaconBlock block) {
final BlockGossipEvent blockGossipEvent = new BlockGossipEvent(block);
notifySubscribersOfEvent(EventType.block_gossip, blockGossipEvent);
}

protected void onNewBlobSidecar(final BlobSidecar blobSidecar) {
final BlobSidecarEvent blobSidecarEvent = BlobSidecarEvent.create(spec, blobSidecar);
notifySubscribersOfEvent(EventType.blob_sidecar, blobSidecarEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ void shouldPropagateBlock() throws IOException {
checkEvent("block", new BlockEvent(sampleBlock.asInternalSignedBeaconBlock(spec), false));
}

@Test
void shouldPropagateBlockGossip() throws IOException {
when(req.getQueryString()).thenReturn("&topics=block_gossip");
manager.registerClient(client1);

triggerBlockGossipEvent();
checkEvent("block_gossip", new BlockGossipEvent(sampleBlock.asInternalSignedBeaconBlock(spec)));
}

@Test
void shouldPropagateBlobSidecar() throws IOException {
when(req.getQueryString()).thenReturn("&topics=blob_sidecar");
Expand Down Expand Up @@ -444,6 +453,11 @@ private void triggerBlockEvent() {
asyncRunner.executeQueuedActions();
}

private void triggerBlockGossipEvent() {
manager.onNewBlockGossip(sampleBlock.asInternalSignedBeaconBlock(spec));
asyncRunner.executeQueuedActions();
}

private void triggerBlobSidecarEvent() {
manager.onNewBlobSidecar(sampleBlobSidecar);
asyncRunner.executeQueuedActions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener;
import tech.pegasys.teku.spec.datastructures.blocks.ImportedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -168,7 +168,7 @@ private SafeFuture<Optional<SubmitDataError>> addBlsToExecutionChange(
});
}

public void subscribeToReceivedBlocks(ImportedBlockListener listener) {
public void subscribeToReceivedBlocks(ReceivedBlockListener listener) {
blockManager.subscribeToReceivedBlocks(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum EventType {
blob_sidecar,
attester_slashing,
proposer_slashing,
payload_attributes;
payload_attributes,
block_gossip;

public static List<EventType> getTopics(List<String> topics) {
return topics.stream().map(EventType::valueOf).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

package tech.pegasys.teku.spec.datastructures.blocks;

@FunctionalInterface
public interface ImportedBlockListener {
public interface ReceivedBlockListener {
void onBlockValidated(SignedBeaconBlock block);

void onBlockImported(SignedBeaconBlock block, boolean executionOptimistic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ public void onBlockImported(final SignedBeaconBlock block) {
});
}

@Override
public void onBlockValidated(SignedBeaconBlock block) {}

public SafeFuture<AttestationProcessingResult> onAttestation(
final ValidatableAttestation attestation) {
if (pendingAttestations.contains(attestation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

@FunctionalInterface
public interface BlockImportNotifications extends VoidReturningChannelInterface {
void onBlockImported(SignedBeaconBlock block);

void onBlockValidated(SignedBeaconBlock block);
}
Loading

0 comments on commit e7c7ff5

Please sign in to comment.