Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into duty-time-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyeh committed Oct 10, 2023
2 parents 079c04c + 7732c22 commit dc3b248
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ executors:
machine_executor_amd64:
machine:
image: ubuntu-2204:2023.04.2 # https://circleci.com/developer/machine/image/ubuntu-2204
docker_layer_caching: true
docker_layer_caching: false
working_directory: ~/project
environment:
architecture: "amd64"
Expand Down
1 change: 1 addition & 0 deletions beacon/validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ dependencies {

jmhImplementation testFixtures(project(':ethereum:spec'))
jmhImplementation 'org.mockito:mockito-core'
jmhImplementation testFixtures(project(':eth-benchmark-tests'))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.coordinator.duties;

import static tech.pegasys.teku.ethereum.json.types.EthereumTypes.PUBLIC_KEY_TYPE;
import static tech.pegasys.teku.infrastructure.http.RestApiConstants.EXECUTION_OPTIMISTIC;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BOOLEAN_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.INTEGER_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE;

import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import tech.pegasys.teku.benchmarks.gen.KeyFileGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.interop.GenesisStateBuilder;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.bellatrix.BeaconStateBellatrix;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.bellatrix.MutableBeaconStateBellatrix;
import tech.pegasys.teku.validator.api.AttesterDuties;
import tech.pegasys.teku.validator.api.AttesterDuty;

@Fork(1)
@State(Scope.Thread)
public class AttesterDutiesGeneraterBenchmark {
private static final SerializableTypeDefinition<AttesterDuty> ATTESTER_DUTY_TYPE =
SerializableTypeDefinition.object(AttesterDuty.class)
.name("AttesterDuty")
.withField("pubkey", PUBLIC_KEY_TYPE, AttesterDuty::getPublicKey)
.withField("validator_index", INTEGER_TYPE, AttesterDuty::getValidatorIndex)
.withField("committee_index", INTEGER_TYPE, AttesterDuty::getCommitteeIndex)
.withField("committee_length", INTEGER_TYPE, AttesterDuty::getCommitteeLength)
.withField("committees_at_slot", INTEGER_TYPE, AttesterDuty::getCommitteesAtSlot)
.withField(
"validator_committee_index", INTEGER_TYPE, AttesterDuty::getValidatorCommitteeIndex)
.withField("slot", UINT64_TYPE, AttesterDuty::getSlot)
.build();
public static final SerializableTypeDefinition<AttesterDuties> RESPONSE_TYPE =
SerializableTypeDefinition.object(AttesterDuties.class)
.name("GetAttesterDutiesResponse")
.withField("dependent_root", BYTES32_TYPE, AttesterDuties::getDependentRoot)
.withField(EXECUTION_OPTIMISTIC, BOOLEAN_TYPE, AttesterDuties::isExecutionOptimistic)
.withField(
"data",
SerializableTypeDefinition.listOf(ATTESTER_DUTY_TYPE),
AttesterDuties::getDuties)
.build();
private final Spec spec = TestSpecFactory.createMinimalBellatrix();
private BeaconStateBellatrix state;
private AttesterDutiesGenerator attesterDutiesGenerator;

private UInt64 epoch;

IntList validatorIndices = new IntArrayList();

@Param({"800000"})
int validatorsCount = 800_000;

@Param({"500000"})
int querySize = 50_000;

@Setup(Level.Trial)
public void init() {
List<BLSKeyPair> validatorKeys = KeyFileGenerator.readValidatorKeys(validatorsCount);
state =
BeaconStateBellatrix.required(
new GenesisStateBuilder()
.spec(spec)
.signDeposits(false)
.addValidators(validatorKeys)
.build());
final MutableBeaconStateBellatrix mutableState = state.createWritableCopy();
mutableState.setSlot(UInt64.ONE);
state = mutableState.commitChanges();

for (int i = 0; i < querySize; i++) {
validatorIndices.add(i);
}

attesterDutiesGenerator = new AttesterDutiesGenerator(spec);
System.out.println("Done!");
epoch = spec.computeEpochAtSlot(state.getSlot()).increment();
}

@Benchmark
@Warmup(iterations = 5, time = 2000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10)
public void computeAttesterDuties(Blackhole bh) {
AttesterDuties attesterDutiesFromIndicesAndState =
attesterDutiesGenerator.getAttesterDutiesFromIndicesAndState(
state, epoch, validatorIndices, false);

bh.consume(attesterDutiesFromIndicesAndState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -81,7 +80,6 @@
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.validator.api.AttesterDuties;
import tech.pegasys.teku.validator.api.AttesterDuty;
import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest;
import tech.pegasys.teku.validator.api.NodeSyncingException;
import tech.pegasys.teku.validator.api.ProposerDuties;
Expand All @@ -92,6 +90,7 @@
import tech.pegasys.teku.validator.api.SyncCommitteeDuty;
import tech.pegasys.teku.validator.api.SyncCommitteeSubnetSubscription;
import tech.pegasys.teku.validator.api.ValidatorApiChannel;
import tech.pegasys.teku.validator.coordinator.duties.AttesterDutiesGenerator;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;
import tech.pegasys.teku.validator.coordinator.publisher.BlockPublisher;
import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher;
Expand Down Expand Up @@ -125,6 +124,8 @@ public class ValidatorApiHandler implements ValidatorApiChannel {
private final ProposersDataManager proposersDataManager;
private final BlockPublisher blockPublisher;

private final AttesterDutiesGenerator attesterDutiesGenerator;

public ValidatorApiHandler(
final ChainDataProvider chainDataProvider,
final NodeDataProvider nodeDataProvider,
Expand Down Expand Up @@ -174,6 +175,7 @@ public ValidatorApiHandler(
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics);
this.attesterDutiesGenerator = new AttesterDutiesGenerator(spec);
}

@Override
Expand Down Expand Up @@ -205,6 +207,7 @@ public SafeFuture<Map<BLSPublicKey, Integer>> getValidatorIndices(
@Override
public SafeFuture<Optional<AttesterDuties>> getAttestationDuties(
final UInt64 epoch, final IntCollection validatorIndices) {

if (isSyncActive()) {
return NodeSyncingException.failedFuture();
}
Expand All @@ -218,14 +221,22 @@ public SafeFuture<Optional<AttesterDuties>> getAttestationDuties(
"Attestation duties were requested %s epochs ahead, only 1 epoch in future is supported.",
epoch.minus(combinedChainDataClient.getCurrentEpoch()).toString())));
}
// what state can we use? If the current or next epoch, we can use the best state,
// which would guarantee no state regeneration
final UInt64 slot = spec.getEarliestQueryableSlotForBeaconCommitteeInTargetEpoch(epoch);

LOG.trace("Retrieving attestation duties from epoch {} using state at slot {}", epoch, slot);
return combinedChainDataClient
.getStateAtSlotExact(slot)
.thenApply(
optionalState ->
optionalState.map(
state -> getAttesterDutiesFromIndicesAndState(state, epoch, validatorIndices)));
state ->
attesterDutiesGenerator.getAttesterDutiesFromIndicesAndState(
state,
epoch,
validatorIndices,
combinedChainDataClient.isChainHeadOptimistic())));
}

@Override
Expand Down Expand Up @@ -684,42 +695,6 @@ private ProposerDuties getProposerDutiesFromIndicesAndState(
combinedChainDataClient.isChainHeadOptimistic());
}

private AttesterDuties getAttesterDutiesFromIndicesAndState(
final BeaconState state, final UInt64 epoch, final IntCollection validatorIndices) {
final Bytes32 dependentRoot =
epoch.isGreaterThan(spec.getCurrentEpoch(state))
? spec.atEpoch(epoch).getBeaconStateUtil().getCurrentDutyDependentRoot(state)
: spec.atEpoch(epoch).getBeaconStateUtil().getPreviousDutyDependentRoot(state);
return new AttesterDuties(
combinedChainDataClient.isChainHeadOptimistic(),
dependentRoot,
validatorIndices
.intStream()
.mapToObj(index -> createAttesterDuties(state, epoch, index))
.filter(Optional::isPresent)
.map(Optional::get)
.toList());
}

private Optional<AttesterDuty> createAttesterDuties(
final BeaconState state, final UInt64 epoch, final int validatorIndex) {

return combine(
spec.getValidatorPubKey(state, UInt64.valueOf(validatorIndex)),
spec.getCommitteeAssignment(state, epoch, validatorIndex),
(pkey, committeeAssignment) -> {
final UInt64 committeeCountPerSlot = spec.getCommitteeCountPerSlot(state, epoch);
return new AttesterDuty(
pkey,
validatorIndex,
committeeAssignment.getCommittee().size(),
committeeAssignment.getCommitteeIndex().intValue(),
committeeCountPerSlot.intValue(),
committeeAssignment.getCommittee().indexOf(validatorIndex),
committeeAssignment.getSlot());
});
}

private SafeFuture<Optional<BeaconState>> getStateForCommitteeDuties(
final SpecVersion specVersion, final UInt64 epoch) {
final Optional<SyncCommitteeUtil> maybeSyncCommitteeUtil = specVersion.getSyncCommitteeUtil();
Expand Down Expand Up @@ -827,14 +802,6 @@ private SszList<SignedValidatorRegistration> getApplicableValidatorRegistrations
return validatorRegistrations.getSchema().createFromElements(applicableValidatorRegistrations);
}

private static <A, B, R> Optional<R> combine(
Optional<A> a, Optional<B> b, BiFunction<A, B, R> fun) {
if (a.isEmpty() || b.isEmpty()) {
return Optional.empty();
}
return Optional.ofNullable(fun.apply(a.get(), b.get()));
}

private List<ProposerDuty> getProposalSlotsForEpoch(final BeaconState state, final UInt64 epoch) {
final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch);
final UInt64 startSlot = epochStartSlot.max(GENESIS_SLOT.increment());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.coordinator.duties;

import it.unimi.dsi.fastutil.ints.IntCollection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.state.CommitteeAssignment;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.common.helpers.BeaconStateAccessors;
import tech.pegasys.teku.validator.api.AttesterDuties;
import tech.pegasys.teku.validator.api.AttesterDuty;

public class AttesterDutiesGenerator {
private final Spec spec;

public AttesterDutiesGenerator(Spec spec) {
this.spec = spec;
}

public AttesterDuties getAttesterDutiesFromIndicesAndState(
final BeaconState state,
final UInt64 epoch,
final IntCollection validatorIndices,
final boolean isChainHeadOptimistic) {
final Bytes32 dependentRoot =
epoch.isGreaterThan(spec.getCurrentEpoch(state))
? spec.atEpoch(epoch).getBeaconStateUtil().getCurrentDutyDependentRoot(state)
: spec.atEpoch(epoch).getBeaconStateUtil().getPreviousDutyDependentRoot(state);
final List<AttesterDuty> duties = createAttesterDuties(state, epoch, validatorIndices);
return new AttesterDuties(isChainHeadOptimistic, dependentRoot, duties);
}

private List<AttesterDuty> createAttesterDuties(
final BeaconState state, final UInt64 epoch, final IntCollection validatorIndices) {
final List<Optional<AttesterDuty>> maybeAttesterDutyList = new ArrayList<>();
final BeaconStateAccessors beaconStateAccessors = spec.atEpoch(epoch).beaconStateAccessors();
final UInt64 committeeCountPerSlot =
beaconStateAccessors.getCommitteeCountPerSlot(state, epoch);
final Map<Integer, CommitteeAssignment> validatorIndexToCommitteeAssignmentMap =
spec.getValidatorIndexToCommitteeAssignmentMap(state, epoch);
for (final Integer validatorIndex : validatorIndices) {
final CommitteeAssignment committeeAssignment =
validatorIndexToCommitteeAssignmentMap.get(validatorIndex);
if (committeeAssignment != null) {
maybeAttesterDutyList.add(
attesterDutyFromCommitteeAssignment(
committeeAssignment, validatorIndex, committeeCountPerSlot, state));
}
}
return maybeAttesterDutyList.stream().filter(Optional::isPresent).map(Optional::get).toList();
}

private Optional<AttesterDuty> attesterDutyFromCommitteeAssignment(
final CommitteeAssignment committeeAssignment,
final int validatorIndex,
final UInt64 committeeCountPerSlot,
final BeaconState state) {
return spec.getValidatorPubKey(state, UInt64.valueOf(validatorIndex))
.map(
publicKey ->
new AttesterDuty(
publicKey,
validatorIndex,
committeeAssignment.getCommittee().size(),
committeeAssignment.getCommitteeIndex().intValue(),
committeeCountPerSlot.intValue(),
committeeAssignment.getCommittee().indexOf(validatorIndex),
committeeAssignment.getSlot()));
}
}
7 changes: 7 additions & 0 deletions ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,13 @@ public Optional<CommitteeAssignment> getCommitteeAssignment(
return atEpoch(epoch).getValidatorsUtil().getCommitteeAssignment(state, epoch, validatorIndex);
}

public Map<Integer, CommitteeAssignment> getValidatorIndexToCommitteeAssignmentMap(
final BeaconState state, final UInt64 epoch) {
return atEpoch(epoch)
.getValidatorsUtil()
.getValidatorIndexToCommitteeAssignmentMap(state, epoch);
}

// Attestation helpers
public IntList getAttestingIndices(BeaconState state, AttestationData data, SszBitlist bits) {
return atState(state).getAttestationUtil().getAttestingIndices(state, data, bits);
Expand Down
Loading

0 comments on commit dc3b248

Please sign in to comment.