diff --git a/.circleci/config.yml b/.circleci/config.yml index 71deb8e67cc..ef312107955 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,7 +62,7 @@ executors: machine_executor_amd64: machine: image: ubuntu-2204:2023.04.2 # https://circleci.com/developer/machine/image/ubuntu-2204 - docker_layer_caching: true + docker_layer_caching: false working_directory: ~/project environment: architecture: "amd64" diff --git a/beacon/validator/build.gradle b/beacon/validator/build.gradle index 34e3e559db3..fac75cae751 100644 --- a/beacon/validator/build.gradle +++ b/beacon/validator/build.gradle @@ -46,4 +46,5 @@ dependencies { jmhImplementation testFixtures(project(':ethereum:spec')) jmhImplementation 'org.mockito:mockito-core' + jmhImplementation testFixtures(project(':eth-benchmark-tests')) } diff --git a/beacon/validator/src/jmh/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGeneraterBenchmark.java b/beacon/validator/src/jmh/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGeneraterBenchmark.java new file mode 100644 index 00000000000..54bc365ca1b --- /dev/null +++ b/beacon/validator/src/jmh/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGeneraterBenchmark.java @@ -0,0 +1,121 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.coordinator.duties; + +import static tech.pegasys.teku.ethereum.json.types.EthereumTypes.PUBLIC_KEY_TYPE; +import static tech.pegasys.teku.infrastructure.http.RestApiConstants.EXECUTION_OPTIMISTIC; +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BOOLEAN_TYPE; +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE; +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.INTEGER_TYPE; +import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import tech.pegasys.teku.benchmarks.gen.KeyFileGenerator; +import tech.pegasys.teku.bls.BLSKeyPair; +import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.interop.GenesisStateBuilder; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.bellatrix.BeaconStateBellatrix; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.bellatrix.MutableBeaconStateBellatrix; +import tech.pegasys.teku.validator.api.AttesterDuties; +import tech.pegasys.teku.validator.api.AttesterDuty; + +@Fork(1) +@State(Scope.Thread) +public class AttesterDutiesGeneraterBenchmark { + private static final SerializableTypeDefinition ATTESTER_DUTY_TYPE = + SerializableTypeDefinition.object(AttesterDuty.class) + .name("AttesterDuty") + .withField("pubkey", PUBLIC_KEY_TYPE, AttesterDuty::getPublicKey) + .withField("validator_index", INTEGER_TYPE, AttesterDuty::getValidatorIndex) + .withField("committee_index", INTEGER_TYPE, AttesterDuty::getCommitteeIndex) + .withField("committee_length", INTEGER_TYPE, AttesterDuty::getCommitteeLength) + .withField("committees_at_slot", INTEGER_TYPE, AttesterDuty::getCommitteesAtSlot) + .withField( + "validator_committee_index", INTEGER_TYPE, AttesterDuty::getValidatorCommitteeIndex) + .withField("slot", UINT64_TYPE, AttesterDuty::getSlot) + .build(); + public static final SerializableTypeDefinition RESPONSE_TYPE = + SerializableTypeDefinition.object(AttesterDuties.class) + .name("GetAttesterDutiesResponse") + .withField("dependent_root", BYTES32_TYPE, AttesterDuties::getDependentRoot) + .withField(EXECUTION_OPTIMISTIC, BOOLEAN_TYPE, AttesterDuties::isExecutionOptimistic) + .withField( + "data", + SerializableTypeDefinition.listOf(ATTESTER_DUTY_TYPE), + AttesterDuties::getDuties) + .build(); + private final Spec spec = TestSpecFactory.createMinimalBellatrix(); + private BeaconStateBellatrix state; + private AttesterDutiesGenerator attesterDutiesGenerator; + + private UInt64 epoch; + + IntList validatorIndices = new IntArrayList(); + + @Param({"800000"}) + int validatorsCount = 800_000; + + @Param({"500000"}) + int querySize = 50_000; + + @Setup(Level.Trial) + public void init() { + List validatorKeys = KeyFileGenerator.readValidatorKeys(validatorsCount); + state = + BeaconStateBellatrix.required( + new GenesisStateBuilder() + .spec(spec) + .signDeposits(false) + .addValidators(validatorKeys) + .build()); + final MutableBeaconStateBellatrix mutableState = state.createWritableCopy(); + mutableState.setSlot(UInt64.ONE); + state = mutableState.commitChanges(); + + for (int i = 0; i < querySize; i++) { + validatorIndices.add(i); + } + + attesterDutiesGenerator = new AttesterDutiesGenerator(spec); + System.out.println("Done!"); + epoch = spec.computeEpochAtSlot(state.getSlot()).increment(); + } + + @Benchmark + @Warmup(iterations = 5, time = 2000, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = 10) + public void computeAttesterDuties(Blackhole bh) { + AttesterDuties attesterDutiesFromIndicesAndState = + attesterDutiesGenerator.getAttesterDutiesFromIndicesAndState( + state, epoch, validatorIndices, false); + + bh.consume(attesterDutiesFromIndicesAndState); + } +} diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index d6b81cb1584..0f4cd61e7ef 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.BiFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; @@ -81,7 +80,6 @@ import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.storage.client.CombinedChainDataClient; import tech.pegasys.teku.validator.api.AttesterDuties; -import tech.pegasys.teku.validator.api.AttesterDuty; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.NodeSyncingException; import tech.pegasys.teku.validator.api.ProposerDuties; @@ -92,6 +90,7 @@ import tech.pegasys.teku.validator.api.SyncCommitteeDuty; import tech.pegasys.teku.validator.api.SyncCommitteeSubnetSubscription; import tech.pegasys.teku.validator.api.ValidatorApiChannel; +import tech.pegasys.teku.validator.coordinator.duties.AttesterDutiesGenerator; import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; import tech.pegasys.teku.validator.coordinator.publisher.BlockPublisher; import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher; @@ -125,6 +124,8 @@ public class ValidatorApiHandler implements ValidatorApiChannel { private final ProposersDataManager proposersDataManager; private final BlockPublisher blockPublisher; + private final AttesterDutiesGenerator attesterDutiesGenerator; + public ValidatorApiHandler( final ChainDataProvider chainDataProvider, final NodeDataProvider nodeDataProvider, @@ -174,6 +175,7 @@ public ValidatorApiHandler( blobSidecarGossipChannel, performanceTracker, dutyMetrics); + this.attesterDutiesGenerator = new AttesterDutiesGenerator(spec); } @Override @@ -205,6 +207,7 @@ public SafeFuture> getValidatorIndices( @Override public SafeFuture> getAttestationDuties( final UInt64 epoch, final IntCollection validatorIndices) { + if (isSyncActive()) { return NodeSyncingException.failedFuture(); } @@ -218,14 +221,22 @@ public SafeFuture> getAttestationDuties( "Attestation duties were requested %s epochs ahead, only 1 epoch in future is supported.", epoch.minus(combinedChainDataClient.getCurrentEpoch()).toString()))); } + // what state can we use? If the current or next epoch, we can use the best state, + // which would guarantee no state regeneration final UInt64 slot = spec.getEarliestQueryableSlotForBeaconCommitteeInTargetEpoch(epoch); + LOG.trace("Retrieving attestation duties from epoch {} using state at slot {}", epoch, slot); return combinedChainDataClient .getStateAtSlotExact(slot) .thenApply( optionalState -> optionalState.map( - state -> getAttesterDutiesFromIndicesAndState(state, epoch, validatorIndices))); + state -> + attesterDutiesGenerator.getAttesterDutiesFromIndicesAndState( + state, + epoch, + validatorIndices, + combinedChainDataClient.isChainHeadOptimistic()))); } @Override @@ -684,42 +695,6 @@ private ProposerDuties getProposerDutiesFromIndicesAndState( combinedChainDataClient.isChainHeadOptimistic()); } - private AttesterDuties getAttesterDutiesFromIndicesAndState( - final BeaconState state, final UInt64 epoch, final IntCollection validatorIndices) { - final Bytes32 dependentRoot = - epoch.isGreaterThan(spec.getCurrentEpoch(state)) - ? spec.atEpoch(epoch).getBeaconStateUtil().getCurrentDutyDependentRoot(state) - : spec.atEpoch(epoch).getBeaconStateUtil().getPreviousDutyDependentRoot(state); - return new AttesterDuties( - combinedChainDataClient.isChainHeadOptimistic(), - dependentRoot, - validatorIndices - .intStream() - .mapToObj(index -> createAttesterDuties(state, epoch, index)) - .filter(Optional::isPresent) - .map(Optional::get) - .toList()); - } - - private Optional createAttesterDuties( - final BeaconState state, final UInt64 epoch, final int validatorIndex) { - - return combine( - spec.getValidatorPubKey(state, UInt64.valueOf(validatorIndex)), - spec.getCommitteeAssignment(state, epoch, validatorIndex), - (pkey, committeeAssignment) -> { - final UInt64 committeeCountPerSlot = spec.getCommitteeCountPerSlot(state, epoch); - return new AttesterDuty( - pkey, - validatorIndex, - committeeAssignment.getCommittee().size(), - committeeAssignment.getCommitteeIndex().intValue(), - committeeCountPerSlot.intValue(), - committeeAssignment.getCommittee().indexOf(validatorIndex), - committeeAssignment.getSlot()); - }); - } - private SafeFuture> getStateForCommitteeDuties( final SpecVersion specVersion, final UInt64 epoch) { final Optional maybeSyncCommitteeUtil = specVersion.getSyncCommitteeUtil(); @@ -827,14 +802,6 @@ private SszList getApplicableValidatorRegistrations return validatorRegistrations.getSchema().createFromElements(applicableValidatorRegistrations); } - private static Optional combine( - Optional a, Optional b, BiFunction fun) { - if (a.isEmpty() || b.isEmpty()) { - return Optional.empty(); - } - return Optional.ofNullable(fun.apply(a.get(), b.get())); - } - private List getProposalSlotsForEpoch(final BeaconState state, final UInt64 epoch) { final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch); final UInt64 startSlot = epochStartSlot.max(GENESIS_SLOT.increment()); diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGenerator.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGenerator.java new file mode 100644 index 00000000000..6ba587db260 --- /dev/null +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/duties/AttesterDutiesGenerator.java @@ -0,0 +1,87 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.coordinator.duties; + +import it.unimi.dsi.fastutil.ints.IntCollection; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.CommitteeAssignment; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.logic.common.helpers.BeaconStateAccessors; +import tech.pegasys.teku.validator.api.AttesterDuties; +import tech.pegasys.teku.validator.api.AttesterDuty; + +public class AttesterDutiesGenerator { + private final Spec spec; + + public AttesterDutiesGenerator(Spec spec) { + this.spec = spec; + } + + public AttesterDuties getAttesterDutiesFromIndicesAndState( + final BeaconState state, + final UInt64 epoch, + final IntCollection validatorIndices, + final boolean isChainHeadOptimistic) { + final Bytes32 dependentRoot = + epoch.isGreaterThan(spec.getCurrentEpoch(state)) + ? spec.atEpoch(epoch).getBeaconStateUtil().getCurrentDutyDependentRoot(state) + : spec.atEpoch(epoch).getBeaconStateUtil().getPreviousDutyDependentRoot(state); + final List duties = createAttesterDuties(state, epoch, validatorIndices); + return new AttesterDuties(isChainHeadOptimistic, dependentRoot, duties); + } + + private List createAttesterDuties( + final BeaconState state, final UInt64 epoch, final IntCollection validatorIndices) { + final List> maybeAttesterDutyList = new ArrayList<>(); + final BeaconStateAccessors beaconStateAccessors = spec.atEpoch(epoch).beaconStateAccessors(); + final UInt64 committeeCountPerSlot = + beaconStateAccessors.getCommitteeCountPerSlot(state, epoch); + final Map validatorIndexToCommitteeAssignmentMap = + spec.getValidatorIndexToCommitteeAssignmentMap(state, epoch); + for (final Integer validatorIndex : validatorIndices) { + final CommitteeAssignment committeeAssignment = + validatorIndexToCommitteeAssignmentMap.get(validatorIndex); + if (committeeAssignment != null) { + maybeAttesterDutyList.add( + attesterDutyFromCommitteeAssignment( + committeeAssignment, validatorIndex, committeeCountPerSlot, state)); + } + } + return maybeAttesterDutyList.stream().filter(Optional::isPresent).map(Optional::get).toList(); + } + + private Optional attesterDutyFromCommitteeAssignment( + final CommitteeAssignment committeeAssignment, + final int validatorIndex, + final UInt64 committeeCountPerSlot, + final BeaconState state) { + return spec.getValidatorPubKey(state, UInt64.valueOf(validatorIndex)) + .map( + publicKey -> + new AttesterDuty( + publicKey, + validatorIndex, + committeeAssignment.getCommittee().size(), + committeeAssignment.getCommitteeIndex().intValue(), + committeeCountPerSlot.intValue(), + committeeAssignment.getCommittee().indexOf(validatorIndex), + committeeAssignment.getSlot())); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java index d5734016749..d7fb744acaf 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java @@ -870,6 +870,13 @@ public Optional getCommitteeAssignment( return atEpoch(epoch).getValidatorsUtil().getCommitteeAssignment(state, epoch, validatorIndex); } + public Map getValidatorIndexToCommitteeAssignmentMap( + final BeaconState state, final UInt64 epoch) { + return atEpoch(epoch) + .getValidatorsUtil() + .getValidatorIndexToCommitteeAssignmentMap(state, epoch); + } + // Attestation helpers public IntList getAttestingIndices(BeaconState state, AttestationData data, SszBitlist bits) { return atState(state).getAttestationUtil().getAttestingIndices(state, data, bits); diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ValidatorsUtil.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ValidatorsUtil.java index 9ea6f1c52e9..f030d7d99b8 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ValidatorsUtil.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ValidatorsUtil.java @@ -17,6 +17,8 @@ import static tech.pegasys.teku.spec.logic.common.helpers.MathHelpers.bytesToUInt64; import it.unimi.dsi.fastutil.ints.IntList; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.bls.BLSSignature; @@ -74,6 +76,27 @@ public Optional getCommitteeAssignment( state, epoch, validatorIndex, beaconStateAccessors.getCommitteeCountPerSlot(state, epoch)); } + public Map getValidatorIndexToCommitteeAssignmentMap( + final BeaconState state, final UInt64 epoch) { + final Map assignmentMap = new HashMap<>(); + + final int slotsPerEpoch = specConfig.getSlotsPerEpoch(); + final int committeeCountPerSlot = + beaconStateAccessors.getCommitteeCountPerSlot(state, epoch).intValue(); + final UInt64 startSlot = miscHelpers.computeStartSlotAtEpoch(epoch); + for (int slotOffset = 0; slotOffset < slotsPerEpoch; slotOffset++) { + final UInt64 slot = startSlot.plus(slotOffset); + for (int i = 0; i < committeeCountPerSlot; i++) { + final UInt64 committeeIndex = UInt64.valueOf(i); + final IntList committee = + beaconStateAccessors.getBeaconCommittee(state, slot, committeeIndex); + committee.forEach( + j -> assignmentMap.put(j, new CommitteeAssignment(committee, committeeIndex, slot))); + } + } + return assignmentMap; + } + /** * Return the committee assignment in the ``epoch`` for ``validator_index``. ``assignment`` * returned is a tuple of the following form: ``assignment[0]`` is the list of validators in the diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/altair/helpers/BeaconStateAccessorsAltair.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/altair/helpers/BeaconStateAccessorsAltair.java index fecb5bbcd56..21333cf8115 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/altair/helpers/BeaconStateAccessorsAltair.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/altair/helpers/BeaconStateAccessorsAltair.java @@ -101,6 +101,8 @@ public IntList getNextSyncCommitteeIndices(final BeaconState state) { final UInt64 epoch = getCurrentEpoch(state).plus(1); final IntList activeValidatorIndices = getActiveValidatorIndices(state, epoch); final int activeValidatorCount = activeValidatorIndices.size(); + checkArgument(activeValidatorCount > 0, "Provided state has no active validators"); + final Bytes32 seed = getSeed(state, epoch, Domain.SYNC_COMMITTEE); int i = 0; final SszList validators = state.getValidators(); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/CacheableStore.java b/storage/src/main/java/tech/pegasys/teku/storage/store/CacheableStore.java new file mode 100644 index 00000000000..b43a41c9451 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/CacheableStore.java @@ -0,0 +1,49 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.store; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blocks.BlockAndCheckpoints; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; +import tech.pegasys.teku.spec.datastructures.execution.SlotAndExecutionPayloadSummary; +import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker; + +/** Store extension dedicated to keep unsafe updates package-private */ +public abstract class CacheableStore implements UpdatableStore { + + abstract void cacheTimeMillis(UInt64 timeMillis); + + abstract void cacheGenesisTime(UInt64 genesisTime); + + abstract void cacheProposerBoostRoot(Optional proposerBoostRoot); + + abstract void cacheBlocks(Collection blockAndCheckpoints); + + abstract void cacheStates(Map stateAndBlockSummaries); + + abstract void cacheFinalizedOptimisticTransitionPayload( + Optional finalizedOptimisticTransitionPayload); + + abstract void cleanupCheckpointStates(Predicate removalCondition); + + abstract void setHighestVotedValidatorIndex(UInt64 highestVotedValidatorIndex); + + abstract void setVote(int index, VoteTracker voteTracker); +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/Store.java b/storage/src/main/java/tech/pegasys/teku/storage/store/Store.java index 1466338e024..3e728f405d4 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/store/Store.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/Store.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -31,6 +32,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,6 +54,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.BlockAndCheckpoints; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; @@ -71,7 +74,7 @@ import tech.pegasys.teku.storage.protoarray.ForkChoiceStrategy; import tech.pegasys.teku.storage.protoarray.ProtoArray; -class Store implements UpdatableStore { +class Store extends CacheableStore { private static final Logger LOG = LogManager.getLogger(); public static final int VOTE_TRACKER_SPARE_CAPACITY = 1000; @@ -87,28 +90,28 @@ class Store implements UpdatableStore { private Optional epochStatesCountGauge = Optional.empty(); - final Optional> maybeEpochStates; + private final Optional> maybeEpochStates; private final Spec spec; private final StateAndBlockSummaryProvider stateProvider; private final BlockProvider blockProvider; private final BlobSidecarsProvider blobSidecarsProvider; private final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider; - final ForkChoiceStrategy forkChoiceStrategy; + private final ForkChoiceStrategy forkChoiceStrategy; private final Optional initialCheckpoint; - UInt64 timeMillis; - UInt64 genesisTime; - AnchorPoint finalizedAnchor; - Checkpoint justifiedCheckpoint; - Checkpoint bestJustifiedCheckpoint; - Optional finalizedOptimisticTransitionPayload; - Optional proposerBoostRoot = Optional.empty(); - final CachingTaskQueue states; - final Map blocks; - final CachingTaskQueue checkpointStates; - VoteTracker[] votes; - UInt64 highestVotedValidatorIndex; + private final CachingTaskQueue states; + private final Map blocks; + private final CachingTaskQueue checkpointStates; + private UInt64 timeMillis; + private UInt64 genesisTime; + private AnchorPoint finalizedAnchor; + private Checkpoint justifiedCheckpoint; + private Checkpoint bestJustifiedCheckpoint; + private Optional finalizedOptimisticTransitionPayload; + private Optional proposerBoostRoot = Optional.empty(); + private VoteTracker[] votes; + private UInt64 highestVotedValidatorIndex; private Store( final MetricsSystem metricsSystem, @@ -495,13 +498,7 @@ public SafeFuture> retrieveSignedBlock(final Bytes32 } // Retrieve and cache block - return blockProvider - .getBlock(blockRoot) - .thenApply( - block -> { - block.ifPresent(this::putBlock); - return block; - }); + return blockProvider.getBlock(blockRoot); } @Override @@ -581,6 +578,74 @@ public SafeFuture> retrieveEarliestBlobSidecarSlot() { return earliestBlobSidecarSlotProvider.getEarliestBlobSidecarSlot(); } + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheBlocks(final Collection blockAndCheckpoints) { + blockAndCheckpoints.stream() + .sorted(Comparator.comparing(BlockAndCheckpoints::getSlot)) + .map(BlockAndCheckpoints::getBlock) + .forEach(block -> blocks.put(block.getRoot(), block)); + blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size())); + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheTimeMillis(final UInt64 timeMillis) { + if (timeMillis.isGreaterThanOrEqualTo(this.timeMillis)) { + this.timeMillis = timeMillis; + } + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheGenesisTime(final UInt64 genesisTime) { + this.genesisTime = genesisTime; + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheProposerBoostRoot(final Optional proposerBoostRoot) { + this.proposerBoostRoot = proposerBoostRoot; + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheStates(final Map stateAndBlockSummaries) { + states.cacheAll(stateAndBlockSummaries); + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cacheFinalizedOptimisticTransitionPayload( + final Optional finalizedOptimisticTransitionPayload) { + this.finalizedOptimisticTransitionPayload = finalizedOptimisticTransitionPayload; + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void cleanupCheckpointStates(final Predicate removalCondition) { + checkpointStates.removeIf(removalCondition); + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void setHighestVotedValidatorIndex(final UInt64 highestVotedValidatorIndex) { + this.highestVotedValidatorIndex = highestVotedValidatorIndex; + + // Expand votes array if needed + if (highestVotedValidatorIndex.isGreaterThanOrEqualTo(votes.length)) { + this.votes = + Arrays.copyOf( + votes, highestVotedValidatorIndex.plus(VOTE_TRACKER_SPARE_CAPACITY).intValue()); + } + } + + /** Non-synchronized, no lock, unsafe if Store is not locked externally */ + @Override + void setVote(final int index, final VoteTracker voteTracker) { + votes[index] = voteTracker; + } + UInt64 getHighestVotedValidatorIndex() { readVotesLock.lock(); try { @@ -617,7 +682,6 @@ private SafeFuture> getAndCacheBlockAndState( signedBeaconBlock -> SafeFuture.completedFuture(Optional.of(signedBeaconBlock))) .orElseGet(() -> blockProvider.getBlock(blockRoot)) - .thenPeek(block -> block.ifPresent(this::putBlock)) .thenApply( block -> block.map(b -> new SignedBlockAndState(b, res.get().getState()))); }); @@ -815,19 +879,6 @@ private boolean isSlotAtNthEpochBoundary( .orElse(false); } - private void putBlock(final SignedBeaconBlock block) { - final Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - if (containsBlock(block.getRoot())) { - blocks.put(block.getRoot(), block); - blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size())); - } - } finally { - writeLock.unlock(); - } - } - @VisibleForTesting Optional> getEpochStates() { return maybeEpochStates; diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransaction.java b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransaction.java index 51e1512e7f0..875f562568e 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransaction.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransaction.java @@ -332,8 +332,9 @@ public Collection getOrderedBlockRoots() { lock.readLock().lock(); try { final NavigableMap blockRootsBySlot = new TreeMap<>(); - store.forkChoiceStrategy.processAllInOrder( - (root, slot, parent) -> blockRootsBySlot.put(slot, root)); + store + .getForkChoiceStrategy() + .processAllInOrder((root, slot, parent) -> blockRootsBySlot.put(slot, root)); this.blockData .values() .forEach( diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdates.java b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdates.java index 69c0a9aaef9..71aa9fe8636 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdates.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdates.java @@ -103,17 +103,15 @@ public StorageUpdate createStorageUpdate() { public void applyToStore(final Store store, final UpdateResult updateResult) { // Add new data - tx.timeMillis - .filter(t -> t.isGreaterThan(store.getTimeMillis())) - .ifPresent(value -> store.timeMillis = value); - tx.genesisTime.ifPresent(value -> store.genesisTime = value); + tx.timeMillis.ifPresent(store::cacheTimeMillis); + tx.genesisTime.ifPresent(store::cacheGenesisTime); tx.justifiedCheckpoint.ifPresent(store::updateJustifiedCheckpoint); tx.bestJustifiedCheckpoint.ifPresent(store::updateBestJustifiedCheckpoint); - hotBlocks.forEach((root, value) -> store.blocks.put(root, value.getBlock())); - store.states.cacheAll(Maps.transformValues(hotBlockAndStates, this::blockAndStateAsSummary)); + store.cacheBlocks(hotBlocks.values()); + store.cacheStates(Maps.transformValues(hotBlockAndStates, this::blockAndStateAsSummary)); if (optimisticTransitionBlockRootSet) { - store.finalizedOptimisticTransitionPayload = - updateResult.getFinalizedOptimisticTransitionPayload(); + store.cacheFinalizedOptimisticTransitionPayload( + updateResult.getFinalizedOptimisticTransitionPayload()); } // Update finalized data @@ -123,18 +121,20 @@ public void applyToStore(final Store store, final UpdateResult updateResult) { // Prune blocks and states prunedHotBlockRoots.keySet().forEach(store::removeStateAndBlock); - store.checkpointStates.removeIf( + store.cleanupCheckpointStates( slotAndBlockRoot -> prunedHotBlockRoots.containsKey(slotAndBlockRoot.getBlockRoot())); if (tx.proposerBoostRootSet) { - store.proposerBoostRoot = tx.proposerBoostRoot; + store.cacheProposerBoostRoot(tx.proposerBoostRoot); } - store.forkChoiceStrategy.applyUpdate( - hotBlocks.values(), - tx.pulledUpBlockCheckpoints, - prunedHotBlockRoots, - store.getFinalizedCheckpoint()); + store + .getForkChoiceStrategy() + .applyUpdate( + hotBlocks.values(), + tx.pulledUpBlockCheckpoints, + prunedHotBlockRoots, + store.getFinalizedCheckpoint()); } private StateAndBlockSummary blockAndStateAsSummary(final SignedBlockAndState blockAndState) { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdatesFactory.java b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdatesFactory.java index d2e59177824..c1e8d90832f 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdatesFactory.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreTransactionUpdatesFactory.java @@ -118,8 +118,9 @@ private StoreTransactionUpdates buildFinalizedUpdates(final Checkpoint finalized // Transition block was finalized by this transaction optimisticTransitionBlockRootSet = true; optimisticTransitionBlockRoot = - baseStore.forkChoiceStrategy.getOptimisticallySyncedTransitionBlockRoot( - latestFinalized.getRoot()); + baseStore + .getForkChoiceStrategy() + .getOptimisticallySyncedTransitionBlockRoot(latestFinalized.getRoot()); } else { optimisticTransitionBlockRootSet = false; optimisticTransitionBlockRoot = Optional.empty(); @@ -161,7 +162,7 @@ private Map getHotStatesToPersist() { private Optional blockSlot(final Bytes32 root) { return Optional.ofNullable(hotBlockAndStates.get(root)) .map(SignedBlockAndState::getSlot) - .or(() -> baseStore.forkChoiceStrategy.blockSlot(root)); + .or(() -> baseStore.getForkChoiceStrategy().blockSlot(root)); } private Map collectFinalizedRoots(final Bytes32 newlyFinalizedBlockRoot) { @@ -176,10 +177,12 @@ private Map collectFinalizedRoots(final Bytes32 newlyFinalized } // Add existing hot blocks that are now finalized - if (baseStore.forkChoiceStrategy.contains(finalizedChainHeadRoot)) { - baseStore.forkChoiceStrategy.processHashesInChain( - finalizedChainHeadRoot, - (blockRoot, slot, parentRoot) -> childToParent.put(blockRoot, parentRoot)); + if (baseStore.getForkChoiceStrategy().contains(finalizedChainHeadRoot)) { + baseStore + .getForkChoiceStrategy() + .processHashesInChain( + finalizedChainHeadRoot, + (blockRoot, slot, parentRoot) -> childToParent.put(blockRoot, parentRoot)); } return childToParent; } @@ -214,12 +217,14 @@ private boolean shouldPrune( private void calculatePrunedHotBlockRoots() { final BeaconBlockSummary finalizedBlock = tx.getLatestFinalized().getBlockSummary(); - baseStore.forkChoiceStrategy.processAllInOrder( - (blockRoot, slot, parentRoot) -> { - if (shouldPrune(finalizedBlock, blockRoot, slot, parentRoot)) { - prunedHotBlockRoots.put(blockRoot, slot); - } - }); + baseStore + .getForkChoiceStrategy() + .processAllInOrder( + (blockRoot, slot, parentRoot) -> { + if (shouldPrune(finalizedBlock, blockRoot, slot, parentRoot)) { + prunedHotBlockRoots.put(blockRoot, slot); + } + }); tx.blockData.values().stream() // Iterate new blocks in slot order to guarantee we see parents first diff --git a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreVoteUpdater.java b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreVoteUpdater.java index 68de06fb711..58eec61c5fb 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/store/StoreVoteUpdater.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/store/StoreVoteUpdater.java @@ -13,7 +13,6 @@ package tech.pegasys.teku.storage.store; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -96,17 +95,8 @@ public Bytes32 applyForkChoiceScoreChanges( public void commit() { // Votes are applied to the store immediately since the changes to the in-memory ProtoArray // can't be rolled back. - - store.highestVotedValidatorIndex = getHighestVotedValidatorIndex(); - - if (store.highestVotedValidatorIndex.intValue() >= store.votes.length) { - store.votes = - Arrays.copyOf( - store.votes, - store.highestVotedValidatorIndex.intValue() + Store.VOTE_TRACKER_SPARE_CAPACITY); - } - - votes.forEach((key, value) -> store.votes[key.intValue()] = value); + store.setHighestVotedValidatorIndex(getHighestVotedValidatorIndex()); + votes.forEach((key, value) -> store.setVote(key.intValue(), value)); voteUpdateChannel.onVotesUpdated(votes); } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/store/AbstractStoreTest.java b/storage/src/test/java/tech/pegasys/teku/storage/store/AbstractStoreTest.java index b845116cd5c..fc687d3750b 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/store/AbstractStoreTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/store/AbstractStoreTest.java @@ -47,6 +47,7 @@ public abstract class AbstractStoreTest { protected final Spec spec = TestSpecFactory.createMinimalDeneb(); protected final StorageUpdateChannel storageUpdateChannel = new StubStorageUpdateChannel(); protected final ChainBuilder chainBuilder = ChainBuilder.create(spec); + protected final StoreConfig defaultStoreConfig = StoreConfig.createDefault(); protected void processChainWithLimitedCache( BiConsumer chainProcessor) { @@ -131,7 +132,7 @@ protected void addBlocks(final UpdatableStore store, final List + tx.putBlockAndState( + blockAndState, spec.calculateBlockCheckpoints(blockAndState.getState()))); + safeJoin(tx.commit()); + final List last32 = + chainBuilder + .streamBlocksAndStates() + .dropWhile( + signedBlockAndState -> + signedBlockAndState + .getSlot() + .isLessThanOrEqualTo( + chainBuilder + .getLatestBlockAndState() + .getSlot() + .minus(defaultStoreConfig.getBlockCacheSize()))) + .toList(); + for (final SignedBlockAndState signedBlockAndState : last32) { + assertThat(store.getBlockIfAvailable(signedBlockAndState.getRoot())).isPresent(); + } + } + private void testApplyChangesWhenTransactionCommits(final boolean withInterleavedTransaction) { final UpdatableStore store = createGenesisStore(); final UInt64 epoch3 = UInt64.valueOf(4);