Skip to content

Commit

Permalink
Improve fcu attributes calculation stability (Consensys#8009)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Feb 28, 2024
1 parent 25b515c commit adc1dd5
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadContext;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber;
import tech.pegasys.teku.storage.client.RecentChainData;

public class ForkChoiceNotifierImpl implements ForkChoiceNotifier, ProposersDataManagerSubscriber {
public class ForkChoiceNotifierImpl implements ForkChoiceNotifier {
private static final Logger LOG = LogManager.getLogger();

private final EventThread eventThread;
Expand Down Expand Up @@ -64,7 +63,6 @@ public ForkChoiceNotifierImpl(
this.recentChainData = recentChainData;
this.proposersDataManager = proposersDataManager;
this.timeProvider = timeProvider;
proposersDataManager.subscribeToProposersDataChanges(this);
}

@Override
Expand Down Expand Up @@ -107,14 +105,6 @@ public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) {
return proposersDataManager.validatorIsConnected(validatorIndex, currentSlot);
}

@Override
public void onPreparedProposersUpdated() {
eventThread.execute(this::internalUpdatePreparableProposers);
}

@Override
public void onValidatorRegistrationsUpdated() {}

private void internalTerminalBlockReached(Bytes32 executionBlockHash) {
eventThread.checkOnEventThread();
LOG.debug("internalTerminalBlockReached executionBlockHash {}", executionBlockHash);
Expand Down Expand Up @@ -198,18 +188,6 @@ private SafeFuture<Optional<ExecutionPayloadContext>> internalGetPayloadId(
}
}

private void internalUpdatePreparableProposers() {
eventThread.checkOnEventThread();

LOG.debug("internalUpdatePreparableProposers");

// Default to the genesis slot if we're pre-genesis.
final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(SpecConfig.GENESIS_SLOT);

// Update payload attributes in case we now need to propose the next block
updatePayloadAttributes(currentSlot.plus(1));
}

private void internalForkChoiceUpdated(
final ForkChoiceState forkChoiceState, final Optional<UInt64> proposingSlot) {
eventThread.checkOnEventThread();
Expand All @@ -220,14 +198,60 @@ private void internalForkChoiceUpdated(

LOG.debug("internalForkChoiceUpdated forkChoiceUpdateData {}", forkChoiceUpdateData);

final Optional<UInt64> attributesSlot =
proposingSlot.or(() -> recentChainData.getCurrentSlot().map(UInt64::increment));

attributesSlot.ifPresent(this::updatePayloadAttributes);
calculatePayloadAttributesSlot(forkChoiceState, proposingSlot)
.ifPresent(this::updatePayloadAttributes);

sendForkChoiceUpdated();
}

/**
* Determine for which slot we should calculate payload attributes (block proposal)
*
* <pre>
* this will guarantee that whenever we calculate a payload attributes for a slot, it will remain stable until:
* 1. next slot attestation due is reached (internalAttestationsDue forcing attributes calculation for next slot)
* OR
* 2. we imported the block for current slot and has become the head
* </pre>
*/
private Optional<UInt64> calculatePayloadAttributesSlot(
final ForkChoiceState forkChoiceState, final Optional<UInt64> proposingSlot) {
if (proposingSlot.isPresent()) {
// We are in the context of a block production, so we should use the proposing slot
return proposingSlot;
}

final Optional<UInt64> currentSlot = recentChainData.getCurrentSlot();
if (currentSlot.isEmpty()) {
// We are pre-genesis, so we don't care about proposing slots
return Optional.empty();
}

final Optional<UInt64> maybeCurrentPayloadAttributesSlot =
forkChoiceUpdateData
.getPayloadBuildingAttributes()
.map(PayloadBuildingAttributes::getProposalSlot);

if (maybeCurrentPayloadAttributesSlot.isPresent()
// we are still in the same slot as the last proposing slot
&& currentSlot.get().equals(maybeCurrentPayloadAttributesSlot.get())
// we have not yet imported our own produced block
&& forkChoiceState.getHeadBlockSlot().isLessThan(maybeCurrentPayloadAttributesSlot.get())) {

LOG.debug(
"current payload attributes slot has been chosen for payload attributes calculation: {}",
currentSlot.get());

// in case we propose two blocks in a row and we fail producing the first block,
// we won't keep using the same first slot because internalAttestationsDue will
// update the payload attributes for the second block slot
return currentSlot;
}

// chain advanced since last proposing slot, we should consider attributes for the next slot
return currentSlot.map(UInt64::increment);
}

private void internalAttestationsDue(final UInt64 slot) {
eventThread.checkOnEventThread();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand Down Expand Up @@ -59,9 +58,6 @@ public class ProposersDataManager implements SlotEventsChannel {
private final Optional<Eth1Address> proposerDefaultFeeRecipient;
private final boolean forkChoiceUpdatedAlwaysSendPayloadAttribute;

private final Subscribers<ProposersDataManagerSubscriber> proposersDataChangesSubscribers =
Subscribers.create(true);

public ProposersDataManager(
final EventThread eventThread,
final Spec spec,
Expand All @@ -88,10 +84,6 @@ public ProposersDataManager(
this.forkChoiceUpdatedAlwaysSendPayloadAttribute = forkChoiceUpdatedAlwaysSendPayloadAttribute;
}

public void subscribeToProposersDataChanges(final ProposersDataManagerSubscriber subscriber) {
proposersDataChangesSubscribers.subscribe(subscriber);
}

@Override
public void onSlot(UInt64 slot) {
// do clean up in the middle of the epoch
Expand Down Expand Up @@ -125,8 +117,6 @@ public void onSlot(UInt64 slot) {
public void updatePreparedProposers(
final Collection<BeaconPreparableProposer> preparedProposers, final UInt64 currentSlot) {
updatePreparedProposerCache(preparedProposers, currentSlot);
proposersDataChangesSubscribers.deliver(
ProposersDataManagerSubscriber::onPreparedProposersUpdated);
}

public SafeFuture<Void> updateValidatorRegistrations(
Expand Down Expand Up @@ -180,9 +170,6 @@ private void updateValidatorRegistrationCache(
LOG.warn(
"validator index not found for public key {}",
signedValidatorRegistration.getMessage().getPublicKey())));

proposersDataChangesSubscribers.deliver(
ProposersDataManagerSubscriber::onValidatorRegistrationsUpdated);
}

public SafeFuture<Optional<PayloadBuildingAttributes>> calculatePayloadBuildingAttributes(
Expand Down Expand Up @@ -302,10 +289,4 @@ public Map<UInt64, RegisteredValidatorInfo> getValidatorRegistrationInfo() {
public boolean isProposerDefaultFeeRecipientDefined() {
return proposerDefaultFeeRecipient.isPresent();
}

public interface ProposersDataManagerSubscriber {
void onPreparedProposersUpdated();

void onValidatorRegistrationsUpdated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,50 @@ void onForkChoiceUpdated_shouldSendNotificationWithPayloadBuildingAttributesForN
verify(executionLayerChannel).engineForkChoiceUpdated(forkChoiceState, Optional.empty());
}

@Test
void onForkChoiceUpdated_shouldSendNotificationWithStableSlot() {
final ForkChoiceState forkChoiceState = getCurrentForkChoiceState();
final BeaconState headState = getHeadState();

// setup two block productions in a row
final UInt64 blockSlot = headState.getSlot().plus(1);
final UInt64 nextBlockSlot = headState.getSlot().plus(2);

final List<PayloadBuildingAttributes> payloadBuildingAttributes =
withProposerForTwoSlots(forkChoiceState, headState, blockSlot, nextBlockSlot);

storageSystem.chainUpdater().setCurrentSlot(blockSlot);

notifyForkChoiceUpdated(forkChoiceState, Optional.of(blockSlot));
verify(executionLayerChannel)
.engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(0)));

assertThat(recentChainData.getCurrentSlot()).contains(blockSlot);

// fcu call with same state but no proposerSlot should not cause a new call to the EL
notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState);
verifyNoMoreInteractions(executionLayerChannel);

// when attestation due arrives, the next proposer attributes should be sent
forkChoiceUpdatedResultNotification = null;
notifier.onAttestationsDue(blockSlot);
assertThat(forkChoiceUpdatedResultNotification).isNotNull();
assertThat(forkChoiceUpdatedResultNotification.payloadAttributes())
.contains(payloadBuildingAttributes.get(1));
verify(executionLayerChannel)
.engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(1)));

// we are still on blockSlot, with EL already notified to build nextBlockSlot
notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState);
verifyNoMoreInteractions(executionLayerChannel);

storageSystem.chainUpdater().setCurrentSlot(nextBlockSlot);

// we are asked to build nextBlockSlot, EL is already notified for that
notifyForkChoiceUpdatedVerifyNoNotification(forkChoiceState, Optional.of(nextBlockSlot));
verifyNoMoreInteractions(executionLayerChannel);
}

@Test
void
onForkChoiceUpdated_shouldSendNotificationWithPayloadBuildingAttributesIfConfiguredToAlwaysSendThem() {
Expand Down Expand Up @@ -378,11 +422,7 @@ void onForkChoiceUpdated_shouldSendNotificationOfOrderedPayloadBuildingAttribute
verify(executionLayerChannel)
.engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes.get(0)));

storageSystem
.chainUpdater()
.setCurrentSlot(headState.getSlot().plus(1)); // set current slot to 2

notifyForkChoiceUpdated(forkChoiceState); // calculate attributes for slot 3
notifier.onAttestationsDue(blockSlot); // calculate attributes for slot 3

// expect attributes for slot 3
verify(executionLayerChannel)
Expand Down Expand Up @@ -569,18 +609,16 @@ void onPreparedProposersUpdated_shouldNotIncludePayloadBuildingAttributesWhileSy
}

@Test
void onPreparedProposersUpdated_shouldSendNewNotificationWhenProposerAdded() {
void onPreparedProposersUpdated_shouldNotCallForkChoiceUpdated() {
final ForkChoiceState forkChoiceState = getCurrentForkChoiceState();
final BeaconState headState = getHeadState();
final UInt64 blockSlot = headState.getSlot().plus(1);

notifyForkChoiceUpdated(forkChoiceState);
verify(executionLayerChannel).engineForkChoiceUpdated(forkChoiceState, Optional.empty());

final PayloadBuildingAttributes payloadBuildingAttributes =
withProposerForSlot(forkChoiceState, headState, blockSlot);
verify(executionLayerChannel)
.engineForkChoiceUpdated(forkChoiceState, Optional.of(payloadBuildingAttributes));
withProposerForSlot(forkChoiceState, headState, blockSlot);
verifyNoMoreInteractions(executionLayerChannel);
}

@Test
Expand Down Expand Up @@ -882,6 +920,17 @@ private void notifyForkChoiceUpdated(final ForkChoiceState forkChoiceState) {
notifyForkChoiceUpdated(forkChoiceState, Optional.empty());
}

private void notifyForkChoiceUpdatedVerifyNoNotification(final ForkChoiceState forkChoiceState) {
notifyForkChoiceUpdated(
forkChoiceState, Optional.empty(), notification -> assertThat(notification).isNull());
}

private void notifyForkChoiceUpdatedVerifyNoNotification(
final ForkChoiceState forkChoiceState, final Optional<UInt64> proposingSlot) {
notifyForkChoiceUpdated(
forkChoiceState, proposingSlot, notification -> assertThat(notification).isNull());
}

private void notifyForkChoiceUpdated(
final ForkChoiceState forkChoiceState, final Optional<UInt64> proposingSlot) {
notifyForkChoiceUpdated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber;
import tech.pegasys.teku.storage.client.RecentChainData;

public class ProposerDataManagerTest implements ProposersDataManagerSubscriber {
public class ProposerDataManagerTest {
private final InlineEventThread eventThread = new InlineEventThread();
private final Spec spec = TestSpecFactory.createMinimalBellatrix();
private final Spec specMock = mock(Spec.class);
Expand All @@ -65,9 +64,6 @@ public class ProposerDataManagerTest implements ProposersDataManagerSubscriber {

private final BeaconState state = dataStructureUtil.randomBeaconState();

private boolean onValidatorRegistrationsUpdatedCalled = false;
private boolean onPreparedProposerUpdatedCalled = false;

private final UInt64 slot = UInt64.ONE;
private SszList<SignedValidatorRegistration> registrations;
private final SafeFuture<Void> response = new SafeFuture<>();
Expand All @@ -80,7 +76,6 @@ void shouldCallRegisterValidator() {
final SafeFuture<Void> updateCall =
proposersDataManager.updateValidatorRegistrations(registrations, slot);

assertThat(onValidatorRegistrationsUpdatedCalled).isFalse();
verify(executionLayerChannel).builderRegisterValidators(registrations, slot);
verifyNoMoreInteractions(executionLayerChannel);

Expand All @@ -89,7 +84,6 @@ void shouldCallRegisterValidator() {
assertThat(updateCall).isCompleted();

// final update
assertThat(onValidatorRegistrationsUpdatedCalled).isTrue();
assertRegisteredValidatorsCount(2);
}

Expand All @@ -101,14 +95,12 @@ void shouldNotSignalValidatorRegistrationUpdatedOnError() {
final SafeFuture<Void> updateCall =
proposersDataManager.updateValidatorRegistrations(registrations, slot);

assertThat(onValidatorRegistrationsUpdatedCalled).isFalse();
verify(executionLayerChannel).builderRegisterValidators(registrations, slot);

response.completeExceptionally(new RuntimeException("generic error"));

assertThat(updateCall).isCompletedExceptionally();

assertThat(onValidatorRegistrationsUpdatedCalled).isFalse();
verifyNoMoreInteractions(executionLayerChannel);
assertRegisteredValidatorsCount(0);
}
Expand All @@ -128,8 +120,6 @@ void shouldSignalAllDataUpdatedAndShouldExpireData() {
dataStructureUtil.randomUInt64(), dataStructureUtil.randomEth1Address())),
slot);

assertThat(onValidatorRegistrationsUpdatedCalled).isTrue();
assertThat(onPreparedProposerUpdatedCalled).isTrue();
assertRegisteredValidatorsCount(2);
assertPreparedProposersCount(1);

Expand Down Expand Up @@ -160,8 +150,6 @@ private void prepareRegistrations() {
when(specMock.getValidatorIndex(state, registrations.get(1).getMessage().getPublicKey()))
.thenReturn(Optional.of(1));
when(specMock.getSlotsPerEpoch(any())).thenReturn(spec.getSlotsPerEpoch(slot));

proposersDataManager.subscribeToProposersDataChanges(this);
}

private void assertPreparedProposersCount(final int expectedCount) {
Expand All @@ -179,14 +167,4 @@ private void assertRegisteredValidatorsCount(final int expectedCount) {
.getValue("registered_validators");
assertThat(optionalValue).hasValue(expectedCount);
}

@Override
public void onPreparedProposersUpdated() {
onPreparedProposerUpdatedCalled = true;
}

@Override
public void onValidatorRegistrationsUpdated() {
onValidatorRegistrationsUpdatedCalled = true;
}
}

0 comments on commit adc1dd5

Please sign in to comment.