diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierImpl.java index 91743baf77f..717ab0e3e99 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierImpl.java @@ -23,15 +23,14 @@ import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadContext; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.executionlayer.ForkChoiceState; +import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification; -import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber; import tech.pegasys.teku.storage.client.RecentChainData; -public class ForkChoiceNotifierImpl implements ForkChoiceNotifier, ProposersDataManagerSubscriber { +public class ForkChoiceNotifierImpl implements ForkChoiceNotifier { private static final Logger LOG = LogManager.getLogger(); private final EventThread eventThread; @@ -64,7 +63,6 @@ public ForkChoiceNotifierImpl( this.recentChainData = recentChainData; this.proposersDataManager = proposersDataManager; this.timeProvider = timeProvider; - proposersDataManager.subscribeToProposersDataChanges(this); } @Override @@ -107,14 +105,6 @@ public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) { return proposersDataManager.validatorIsConnected(validatorIndex, currentSlot); } - @Override - public void onPreparedProposersUpdated() { - eventThread.execute(this::internalUpdatePreparableProposers); - } - - @Override - public void onValidatorRegistrationsUpdated() {} - private void internalTerminalBlockReached(Bytes32 executionBlockHash) { eventThread.checkOnEventThread(); LOG.debug("internalTerminalBlockReached executionBlockHash {}", executionBlockHash); @@ -198,18 +188,6 @@ private SafeFuture> internalGetPayloadId( } } - private void internalUpdatePreparableProposers() { - eventThread.checkOnEventThread(); - - LOG.debug("internalUpdatePreparableProposers"); - - // Default to the genesis slot if we're pre-genesis. - final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(SpecConfig.GENESIS_SLOT); - - // Update payload attributes in case we now need to propose the next block - updatePayloadAttributes(currentSlot.plus(1)); - } - private void internalForkChoiceUpdated( final ForkChoiceState forkChoiceState, final Optional proposingSlot) { eventThread.checkOnEventThread(); @@ -220,14 +198,60 @@ private void internalForkChoiceUpdated( LOG.debug("internalForkChoiceUpdated forkChoiceUpdateData {}", forkChoiceUpdateData); - final Optional attributesSlot = - proposingSlot.or(() -> recentChainData.getCurrentSlot().map(UInt64::increment)); - - attributesSlot.ifPresent(this::updatePayloadAttributes); + calculatePayloadAttributesSlot(forkChoiceState, proposingSlot) + .ifPresent(this::updatePayloadAttributes); sendForkChoiceUpdated(); } + /** + * Determine for which slot we should calculate payload attributes (block proposal) + * + *
+   * this will guarantee that whenever we calculate a payload attributes for a slot, it will remain stable until:
+   * 1. next slot attestation due is reached (internalAttestationsDue forcing attributes calculation for next slot)
+   * OR
+   * 2. we imported the block for current slot and has become the head
+   * 
+ */ + private Optional calculatePayloadAttributesSlot( + final ForkChoiceState forkChoiceState, final Optional proposingSlot) { + if (proposingSlot.isPresent()) { + // We are in the context of a block production, so we should use the proposing slot + return proposingSlot; + } + + final Optional currentSlot = recentChainData.getCurrentSlot(); + if (currentSlot.isEmpty()) { + // We are pre-genesis, so we don't care about proposing slots + return Optional.empty(); + } + + final Optional maybeCurrentPayloadAttributesSlot = + forkChoiceUpdateData + .getPayloadBuildingAttributes() + .map(PayloadBuildingAttributes::getProposalSlot); + + if (maybeCurrentPayloadAttributesSlot.isPresent() + // we are still in the same slot as the last proposing slot + && currentSlot.get().equals(maybeCurrentPayloadAttributesSlot.get()) + // we have not yet imported our own produced block + && forkChoiceState.getHeadBlockSlot().isLessThan(maybeCurrentPayloadAttributesSlot.get())) { + + LOG.debug( + "current payload attributes slot has been chosen for payload attributes calculation: {}", + currentSlot.get()); + + // in case we propose two blocks in a row and we fail producing the first block, + // we won't keep using the same first slot because internalAttestationsDue will + // update the payload attributes for the second block slot + return currentSlot; + } + + // chain advanced since last proposing slot, we should consider attributes for the next slot + return currentSlot.map(UInt64::increment); + } + private void internalAttestationsDue(final UInt64 slot) { eventThread.checkOnEventThread(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ProposersDataManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ProposersDataManager.java index fc214e27b45..9459df281a8 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ProposersDataManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ProposersDataManager.java @@ -30,7 +30,6 @@ import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.ssz.SszList; -import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; @@ -59,9 +58,6 @@ public class ProposersDataManager implements SlotEventsChannel { private final Optional proposerDefaultFeeRecipient; private final boolean forkChoiceUpdatedAlwaysSendPayloadAttribute; - private final Subscribers proposersDataChangesSubscribers = - Subscribers.create(true); - public ProposersDataManager( final EventThread eventThread, final Spec spec, @@ -88,10 +84,6 @@ public ProposersDataManager( this.forkChoiceUpdatedAlwaysSendPayloadAttribute = forkChoiceUpdatedAlwaysSendPayloadAttribute; } - public void subscribeToProposersDataChanges(final ProposersDataManagerSubscriber subscriber) { - proposersDataChangesSubscribers.subscribe(subscriber); - } - @Override public void onSlot(UInt64 slot) { // do clean up in the middle of the epoch @@ -125,8 +117,6 @@ public void onSlot(UInt64 slot) { public void updatePreparedProposers( final Collection preparedProposers, final UInt64 currentSlot) { updatePreparedProposerCache(preparedProposers, currentSlot); - proposersDataChangesSubscribers.deliver( - ProposersDataManagerSubscriber::onPreparedProposersUpdated); } public SafeFuture updateValidatorRegistrations( @@ -180,9 +170,6 @@ private void updateValidatorRegistrationCache( LOG.warn( "validator index not found for public key {}", signedValidatorRegistration.getMessage().getPublicKey()))); - - proposersDataChangesSubscribers.deliver( - ProposersDataManagerSubscriber::onValidatorRegistrationsUpdated); } public SafeFuture> calculatePayloadBuildingAttributes( @@ -302,10 +289,4 @@ public Map getValidatorRegistrationInfo() { public boolean isProposerDefaultFeeRecipientDefined() { return proposerDefaultFeeRecipient.isPresent(); } - - public interface ProposersDataManagerSubscriber { - void onPreparedProposersUpdated(); - - void onValidatorRegistrationsUpdated(); - } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java index a7b36a914b8..ba3d39bbffa 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java @@ -258,6 +258,50 @@ void onForkChoiceUpdated_shouldSendNotificationWithPayloadBuildingAttributesForN verify(executionLayerChannel).engineForkChoiceUpdated(forkChoiceState, Optional.empty()); } + @Test + void onForkChoiceUpdated_shouldSendNotificationWithStableSlot() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = getHeadState(); + + // setup two block productions in a row + final UInt64 blockSlot = headState.getSlot().plus(1); + final UInt64 nextBlockSlot = headState.getSlot().plus(2); + + final List payloadBuildingAttributes = + withProposerForTwoSlots(forkChoiceState, headState, blockSlot, nextBlockSlot); + + storageSystem.chainUpdater().setCurrentSlot(blockSlot); + + notifyForkChoiceUpdated(forkChoiceState, Optional.of(blockSlot)); + verify(executionLayerChannel) + .engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(0))); + + assertThat(recentChainData.getCurrentSlot()).contains(blockSlot); + + // fcu call with same state but no proposerSlot should not cause a new call to the EL + notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState); + verifyNoMoreInteractions(executionLayerChannel); + + // when attestation due arrives, the next proposer attributes should be sent + forkChoiceUpdatedResultNotification = null; + notifier.onAttestationsDue(blockSlot); + assertThat(forkChoiceUpdatedResultNotification).isNotNull(); + assertThat(forkChoiceUpdatedResultNotification.payloadAttributes()) + .contains(payloadBuildingAttributes.get(1)); + verify(executionLayerChannel) + .engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(1))); + + // we are still on blockSlot, with EL already notified to build nextBlockSlot + notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState); + verifyNoMoreInteractions(executionLayerChannel); + + storageSystem.chainUpdater().setCurrentSlot(nextBlockSlot); + + // we are asked to build nextBlockSlot, EL is already notified for that + notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState, Optional.of(nextBlockSlot)); + verifyNoMoreInteractions(executionLayerChannel); + } + @Test void onForkChoiceUpdated_shouldSendNotificationWithPayloadBuildingAttributesIfConfiguredToAlwaysSendThem() { @@ -378,11 +422,7 @@ void onForkChoiceUpdated_shouldSendNotificationOfOrderedPayloadBuildingAttribute verify(executionLayerChannel) .engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(0))); - storageSystem - .chainUpdater() - .setCurrentSlot(headState.getSlot().plus(1)); // set current slot to 2 - - notifyForkChoiceUpdated(forkChoiceState); // calculate attributes for slot 3 + notifier.onAttestationsDue(blockSlot); // calculate attributes for slot 3 // expect attributes for slot 3 verify(executionLayerChannel) @@ -569,7 +609,7 @@ void onPreparedProposersUpdated_shouldNotIncludePayloadBuildingAttributesWhileSy } @Test - void onPreparedProposersUpdated_shouldSendNewNotificationWhenProposerAdded() { + void onPreparedProposersUpdated_shouldNotCallForkChoiceUpdated() { final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); final BeaconState headState = getHeadState(); final UInt64 blockSlot = headState.getSlot().plus(1); @@ -577,10 +617,8 @@ void onPreparedProposersUpdated_shouldSendNewNotificationWhenProposerAdded() { notifyForkChoiceUpdated(forkChoiceState); verify(executionLayerChannel).engineForkChoiceUpdated(forkChoiceState, Optional.empty()); - final PayloadBuildingAttributes payloadBuildingAttributes = - withProposerForSlot(forkChoiceState, headState, blockSlot); - verify(executionLayerChannel) - .engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes)); + withProposerForSlot(forkChoiceState, headState, blockSlot); + verifyNoMoreInteractions(executionLayerChannel); } @Test @@ -882,6 +920,17 @@ private void notifyForkChoiceUpdated(final ForkChoiceState forkChoiceState) { notifyForkChoiceUpdated(forkChoiceState, Optional.empty()); } + private void notifyForkChoiceUpdatedVerifyNoNotification(final ForkChoiceState forkChoiceState) { + notifyForkChoiceUpdated( + forkChoiceState, Optional.empty(), notification -> assertThat(notification).isNull()); + } + + private void notifyForkChoiceUpdatedVerifyNoNotification( + final ForkChoiceState forkChoiceState, final Optional proposingSlot) { + notifyForkChoiceUpdated( + forkChoiceState, proposingSlot, notification -> assertThat(notification).isNull()); + } + private void notifyForkChoiceUpdated( final ForkChoiceState forkChoiceState, final Optional proposingSlot) { notifyForkChoiceUpdated( diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ProposerDataManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ProposerDataManagerTest.java index ca66dcdf485..ec6b352c472 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ProposerDataManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ProposerDataManagerTest.java @@ -38,10 +38,9 @@ import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.util.DataStructureUtil; -import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber; import tech.pegasys.teku.storage.client.RecentChainData; -public class ProposerDataManagerTest implements ProposersDataManagerSubscriber { +public class ProposerDataManagerTest { private final InlineEventThread eventThread = new InlineEventThread(); private final Spec spec = TestSpecFactory.createMinimalBellatrix(); private final Spec specMock = mock(Spec.class); @@ -65,9 +64,6 @@ public class ProposerDataManagerTest implements ProposersDataManagerSubscriber { private final BeaconState state = dataStructureUtil.randomBeaconState(); - private boolean onValidatorRegistrationsUpdatedCalled = false; - private boolean onPreparedProposerUpdatedCalled = false; - private final UInt64 slot = UInt64.ONE; private SszList registrations; private final SafeFuture response = new SafeFuture<>(); @@ -80,7 +76,6 @@ void shouldCallRegisterValidator() { final SafeFuture updateCall = proposersDataManager.updateValidatorRegistrations(registrations, slot); - assertThat(onValidatorRegistrationsUpdatedCalled).isFalse(); verify(executionLayerChannel).builderRegisterValidators(registrations, slot); verifyNoMoreInteractions(executionLayerChannel); @@ -89,7 +84,6 @@ void shouldCallRegisterValidator() { assertThat(updateCall).isCompleted(); // final update - assertThat(onValidatorRegistrationsUpdatedCalled).isTrue(); assertRegisteredValidatorsCount(2); } @@ -101,14 +95,12 @@ void shouldNotSignalValidatorRegistrationUpdatedOnError() { final SafeFuture updateCall = proposersDataManager.updateValidatorRegistrations(registrations, slot); - assertThat(onValidatorRegistrationsUpdatedCalled).isFalse(); verify(executionLayerChannel).builderRegisterValidators(registrations, slot); response.completeExceptionally(new RuntimeException("generic error")); assertThat(updateCall).isCompletedExceptionally(); - assertThat(onValidatorRegistrationsUpdatedCalled).isFalse(); verifyNoMoreInteractions(executionLayerChannel); assertRegisteredValidatorsCount(0); } @@ -128,8 +120,6 @@ void shouldSignalAllDataUpdatedAndShouldExpireData() { dataStructureUtil.randomUInt64(), dataStructureUtil.randomEth1Address())), slot); - assertThat(onValidatorRegistrationsUpdatedCalled).isTrue(); - assertThat(onPreparedProposerUpdatedCalled).isTrue(); assertRegisteredValidatorsCount(2); assertPreparedProposersCount(1); @@ -160,8 +150,6 @@ private void prepareRegistrations() { when(specMock.getValidatorIndex(state, registrations.get(1).getMessage().getPublicKey())) .thenReturn(Optional.of(1)); when(specMock.getSlotsPerEpoch(any())).thenReturn(spec.getSlotsPerEpoch(slot)); - - proposersDataManager.subscribeToProposersDataChanges(this); } private void assertPreparedProposersCount(final int expectedCount) { @@ -179,14 +167,4 @@ private void assertRegisteredValidatorsCount(final int expectedCount) { .getValue("registered_validators"); assertThat(optionalValue).hasValue(expectedCount); } - - @Override - public void onPreparedProposersUpdated() { - onPreparedProposerUpdatedCalled = true; - } - - @Override - public void onValidatorRegistrationsUpdated() { - onValidatorRegistrationsUpdatedCalled = true; - } }