From 70b79806894c06b077e82f6293ea42aca176f91d Mon Sep 17 00:00:00 2001 From: Lucas Saldanha Date: Tue, 5 Mar 2024 10:54:37 +1300 Subject: [PATCH] DVT integration for attestation aggregation (#8032) --- .../BeaconCommitteeSelectionProof.java | 6 + .../cli/options/ValidatorClientOptions.java | 20 +- .../cli/options/ValidatorOptionsTest.java | 8 + .../ValidatorClientCommandTest.java | 17 ++ .../teku/validator/api/ValidatorConfig.java | 21 +- .../client/AttestationDutyLoader.java | 33 ++- .../client/DvtAttestationAggregations.java | 142 +++++++++++ .../client/ValidatorClientService.java | 3 +- .../client/AttestationDutyLoaderTest.java | 3 +- .../client/AttestationDutySchedulerTest.java | 6 +- .../DvtAttestationAggregationsTest.java | 224 ++++++++++++++++++ 11 files changed, 466 insertions(+), 17 deletions(-) create mode 100644 validator/client/src/main/java/tech/pegasys/teku/validator/client/DvtAttestationAggregations.java create mode 100644 validator/client/src/test/java/tech/pegasys/teku/validator/client/DvtAttestationAggregationsTest.java diff --git a/ethereum/json-types/src/main/java/tech/pegasys/teku/ethereum/json/types/validator/BeaconCommitteeSelectionProof.java b/ethereum/json-types/src/main/java/tech/pegasys/teku/ethereum/json/types/validator/BeaconCommitteeSelectionProof.java index 555a37220be..5bf53cdead0 100644 --- a/ethereum/json-types/src/main/java/tech/pegasys/teku/ethereum/json/types/validator/BeaconCommitteeSelectionProof.java +++ b/ethereum/json-types/src/main/java/tech/pegasys/teku/ethereum/json/types/validator/BeaconCommitteeSelectionProof.java @@ -14,6 +14,8 @@ package tech.pegasys.teku.ethereum.json.types.validator; import java.util.Objects; +import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.infrastructure.json.types.CoreTypes; import tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -67,6 +69,10 @@ public String getSelectionProof() { return selectionProof; } + public BLSSignature getSelectionProofSignature() { + return BLSSignature.fromBytesCompressed(Bytes.fromHexString(getSelectionProof())); + } + public static BeaconCommitteeSelectionProof.Builder builder() { return new BeaconCommitteeSelectionProof.Builder(); } diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorClientOptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorClientOptions.java index a3d38e6f179..ee7e40ff302 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorClientOptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorClientOptions.java @@ -13,8 +13,6 @@ package tech.pegasys.teku.cli.options; -import static tech.pegasys.teku.validator.api.ValidatorConfig.DEFAULT_VALIDATOR_CLIENT_SSZ_BLOCKS_ENABLED; - import java.net.URI; import java.net.URISyntaxException; import java.util.List; @@ -62,7 +60,8 @@ public class ValidatorClientOptions { showDefaultValue = CommandLine.Help.Visibility.ALWAYS, arity = "0..1", fallbackValue = "true") - private boolean validatorClientSszBlocksEnabled = DEFAULT_VALIDATOR_CLIENT_SSZ_BLOCKS_ENABLED; + private boolean validatorClientSszBlocksEnabled = + ValidatorConfig.DEFAULT_VALIDATOR_CLIENT_SSZ_BLOCKS_ENABLED; @CommandLine.Option( names = {"--Xuse-post-validators-endpoint-enabled"}, @@ -75,6 +74,18 @@ public class ValidatorClientOptions { private boolean validatorClientUsePostValidatorsEndpointEnabled = ValidatorConfig.DEFAULT_VALIDATOR_CLIENT_USE_POST_VALIDATORS_ENDPOINT_ENABLED; + @Option( + names = {"--Xdvt-integration-enabled"}, + paramLabel = "", + description = + "Use DVT endpoints to determine if a distributed validator has aggregation duties.", + arity = "0..1", + showDefaultValue = CommandLine.Help.Visibility.ALWAYS, + hidden = true, + fallbackValue = "true") + private boolean dvtSelectionsEndpointEnabled = + ValidatorConfig.DEFAULT_DVT_SELECTIONS_ENDPOINT_ENABLED; + public void configure(TekuConfiguration.Builder builder) { configureBeaconNodeApiEndpoints(); @@ -87,7 +98,8 @@ public void configure(TekuConfiguration.Builder builder) { validatorClientUsePostValidatorsEndpointEnabled) .failoversSendSubnetSubscriptionsEnabled(failoversSendSubnetSubscriptionsEnabled) .failoversPublishSignedDutiesEnabled(failoversPublishSignedDutiesEnabled) - .sentryNodeConfigurationFile(exclusiveParams.sentryConfigFile)); + .sentryNodeConfigurationFile(exclusiveParams.sentryConfigFile) + .dvtSelectionsEndpointEnabled(dvtSelectionsEndpointEnabled)); } private void configureBeaconNodeApiEndpoints() { diff --git a/teku/src/test/java/tech/pegasys/teku/cli/options/ValidatorOptionsTest.java b/teku/src/test/java/tech/pegasys/teku/cli/options/ValidatorOptionsTest.java index 2a9721e269a..e73348a741a 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/options/ValidatorOptionsTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/options/ValidatorOptionsTest.java @@ -234,4 +234,12 @@ public void shouldSetShutdownWhenValidatorSlashedEnabled() { .getValidatorConfig(); assertThat(config.isShutdownWhenValidatorSlashedEnabled()).isTrue(); } + + @Test + public void shouldNotUseDvtSelectionsEndpointByDefault() { + final String[] args = {}; + final TekuConfiguration config = getTekuConfigurationFromArguments(args); + assertThat(config.validatorClient().getValidatorConfig().isDvtSelectionsEndpointEnabled()) + .isFalse(); + } } diff --git a/teku/src/test/java/tech/pegasys/teku/cli/subcommand/ValidatorClientCommandTest.java b/teku/src/test/java/tech/pegasys/teku/cli/subcommand/ValidatorClientCommandTest.java index 738c32eaf03..37662df0c7f 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/subcommand/ValidatorClientCommandTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/subcommand/ValidatorClientCommandTest.java @@ -210,6 +210,23 @@ public void clientExecutorThreadsShouldThrowOverLimit() { "--Xvalidator-client-executor-threads must be greater than 0 and less than 5000."); } + @Test + public void shouldSetUseDvtSelectionsEndpoint() { + final String[] args = {"vc", "--network", "minimal", "--Xdvt-integration-enabled"}; + final TekuConfiguration config = getTekuConfigurationFromArguments(args); + + assertThat(config.validatorClient().getValidatorConfig().isDvtSelectionsEndpointEnabled()) + .isTrue(); + } + + @Test + public void shouldNotUseDvtSelectionsEndpointByDefault() { + final String[] args = {"vc", "--network", "minimal"}; + final TekuConfiguration config = getTekuConfigurationFromArguments(args); + assertThat(config.validatorClient().getValidatorConfig().isDvtSelectionsEndpointEnabled()) + .isFalse(); + } + private String pathFor(final String filename) { return Resources.getResource(ValidatorClientCommandTest.class, filename).toString(); } diff --git a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorConfig.java b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorConfig.java index fcc59c0e4b2..de0cef86078 100644 --- a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorConfig.java +++ b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorConfig.java @@ -65,6 +65,7 @@ public class ValidatorConfig { public static final boolean DEFAULT_VALIDATOR_BLINDED_BLOCKS_ENABLED = false; public static final int DEFAULT_VALIDATOR_REGISTRATION_SENDING_BATCH_SIZE = 100; public static final UInt64 DEFAULT_BUILDER_REGISTRATION_GAS_LIMIT = UInt64.valueOf(30_000_000); + public static final boolean DEFAULT_DVT_SELECTIONS_ENDPOINT_ENABLED = false; private final List validatorKeys; private final List validatorExternalSignerPublicKeySources; @@ -105,6 +106,7 @@ public class ValidatorConfig { private final int executorThreads; private final boolean isLocalSlashingProtectionSynchronizedModeEnabled; + private final boolean dvtSelectionsEndpointEnabled; private ValidatorConfig( final List validatorKeys, @@ -143,7 +145,8 @@ private ValidatorConfig( final int executorMaxQueueSize, final int executorThreads, final Optional sentryNodeConfigurationFile, - boolean isLocalSlashingProtectionSynchronizedModeEnabled) { + boolean isLocalSlashingProtectionSynchronizedModeEnabled, + boolean dvtSelectionsEndpointEnabled) { this.validatorKeys = validatorKeys; this.validatorExternalSignerPublicKeySources = validatorExternalSignerPublicKeySources; this.validatorExternalSignerUrl = validatorExternalSignerUrl; @@ -186,6 +189,7 @@ private ValidatorConfig( this.sentryNodeConfigurationFile = sentryNodeConfigurationFile; this.isLocalSlashingProtectionSynchronizedModeEnabled = isLocalSlashingProtectionSynchronizedModeEnabled; + this.dvtSelectionsEndpointEnabled = dvtSelectionsEndpointEnabled; } public static Builder builder() { @@ -348,6 +352,10 @@ public boolean isLocalSlashingProtectionSynchronizedModeEnabled() { return isLocalSlashingProtectionSynchronizedModeEnabled; } + public boolean isDvtSelectionsEndpointEnabled() { + return dvtSelectionsEndpointEnabled; + } + public static final class Builder { private List validatorKeys = new ArrayList<>(); private List validatorExternalSignerPublicKeySources = new ArrayList<>(); @@ -395,11 +403,10 @@ public static final class Builder { private Optional builderRegistrationPublicKeyOverride = Optional.empty(); private int executorMaxQueueSize = DEFAULT_EXECUTOR_MAX_QUEUE_SIZE; private Optional sentryNodeConfigurationFile = Optional.empty(); - private int executorThreads = DEFAULT_VALIDATOR_EXECUTOR_THREADS; - private boolean isLocalSlashingProtectionSynchronizedModeEnabled = DEFAULT_VALIDATOR_IS_LOCAL_SLASHING_PROTECTION_SYNCHRONIZED_ENABLED; + private boolean dvtSelectionsEndpointEnabled = DEFAULT_DVT_SELECTIONS_ENDPOINT_ENABLED; private Builder() {} @@ -640,6 +647,11 @@ public Builder isLocalSlashingProtectionSynchronizedModeEnabled( return this; } + public Builder dvtSelectionsEndpointEnabled(final boolean dvtSelectionsEndpointEnabled) { + this.dvtSelectionsEndpointEnabled = dvtSelectionsEndpointEnabled; + return this; + } + public ValidatorConfig build() { validateExternalSignerUrlAndPublicKeys(); validateExternalSignerKeystoreAndPasswordFileConfig(); @@ -683,7 +695,8 @@ public ValidatorConfig build() { executorMaxQueueSize, executorThreads, sentryNodeConfigurationFile, - isLocalSlashingProtectionSynchronizedModeEnabled); + isLocalSlashingProtectionSynchronizedModeEnabled, + dvtSelectionsEndpointEnabled); } private void validateExternalSignerUrlAndPublicKeys() { diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java index cc3f2f6780a..5098f4a4c08 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java @@ -45,6 +45,7 @@ public class AttestationDutyLoader scheduledDutiesFactory; private final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions; private final Spec spec; + private final boolean useDvtEndpoint; public AttestationDutyLoader( final ValidatorApiChannel validatorApiChannel, @@ -54,13 +55,15 @@ public AttestationDutyLoader( final OwnedValidators validators, final ValidatorIndexProvider validatorIndexProvider, final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions, - final Spec spec) { + final Spec spec, + final boolean useDvtEndpoint) { super(validators, validatorIndexProvider); this.validatorApiChannel = validatorApiChannel; this.forkProvider = forkProvider; this.scheduledDutiesFactory = scheduledDutiesFactory; this.beaconCommitteeSubscriptions = beaconCommitteeSubscriptions; this.spec = spec; + this.useDvtEndpoint = useDvtEndpoint; } @Override @@ -77,9 +80,18 @@ protected SafeFuture> requestDuties( final UInt64 epoch, final AttesterDuties duties) { final SlotBasedScheduledDuties scheduledDuties = scheduledDutiesFactory.apply(duties.getDependentRoot()); + + final Optional dvtAttestationAggregationsForEpoch = + useDvtEndpoint + ? Optional.of( + new DvtAttestationAggregations(validatorApiChannel, duties.getDuties().size())) + : Optional.empty(); + return SafeFuture.allOf( duties.getDuties().stream() - .map(duty -> scheduleDuties(scheduledDuties, duty)) + .map( + duty -> + scheduleDuties(scheduledDuties, duty, dvtAttestationAggregationsForEpoch)) .toArray(SafeFuture[]::new)) .>thenApply(__ -> scheduledDuties) .alwaysRun(beaconCommitteeSubscriptions::sendRequests); @@ -87,7 +99,8 @@ protected SafeFuture> requestDuties( private SafeFuture scheduleDuties( final SlotBasedScheduledDuties scheduledDuties, - final AttesterDuty duty) { + final AttesterDuty duty, + final Optional dvtAttestationAggregationLoader) { final Optional maybeValidator = validators.getValidator(duty.getPublicKey()); if (maybeValidator.isEmpty()) { return SafeFuture.COMPLETE; @@ -116,7 +129,8 @@ private SafeFuture scheduleDuties( validator, duty.getSlot(), aggregatorModulo, - unsignedAttestationFuture); + unsignedAttestationFuture, + dvtAttestationAggregationLoader); } private SafeFuture> scheduleAttestationProduction( @@ -147,10 +161,19 @@ private SafeFuture scheduleAggregation( final Validator validator, final UInt64 slot, final int aggregatorModulo, - final SafeFuture> unsignedAttestationFuture) { + final SafeFuture> unsignedAttestationFuture, + final Optional dvtAttestationAggregation) { return forkProvider .getForkInfo(slot) .thenCompose(forkInfo -> validator.getSigner().signAggregationSlot(slot, forkInfo)) + .thenCompose( + slotSignature -> + dvtAttestationAggregation + .map( + dvt -> + dvt.getCombinedSelectionProofFuture( + validatorIndex, slot, slotSignature)) + .orElse(SafeFuture.completedFuture(slotSignature))) .thenAccept( slotSignature -> { final SpecVersion specVersion = spec.atSlot(slot); diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/DvtAttestationAggregations.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/DvtAttestationAggregations.java new file mode 100644 index 00000000000..ef668b388cb --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/DvtAttestationAggregations.java @@ -0,0 +1,142 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.bls.BLSSignature; +import tech.pegasys.teku.ethereum.json.types.validator.BeaconCommitteeSelectionProof; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.validator.api.ValidatorApiChannel; + +public class DvtAttestationAggregations { + + private static final Logger LOG = LogManager.getLogger(); + + private final ValidatorApiChannel validatorApiChannel; + private final Map> pendingRequests = + new ConcurrentHashMap<>(); + private final int expectedDutiesCount; + + public DvtAttestationAggregations( + final ValidatorApiChannel validatorApiChannel, int expectedDutiesCount) { + this.validatorApiChannel = validatorApiChannel; + this.expectedDutiesCount = expectedDutiesCount; + } + + public SafeFuture getCombinedSelectionProofFuture( + final int validatorIndex, final UInt64 slot, final BLSSignature partialProof) { + + LOG.debug( + "Created pending request for DVT attestation aggregation proof (validator_id={}, slot={})", + validatorIndex, + slot); + + final BeaconCommitteeSelectionProof request = + new BeaconCommitteeSelectionProof.Builder() + .validatorIndex(validatorIndex) + .slot(slot) + .selectionProof(partialProof.toBytesCompressed().toHexString()) + .build(); + + final SafeFuture future = new SafeFuture<>(); + pendingRequests.put(request, future); + + if (pendingRequests.size() >= expectedDutiesCount) { + submitBatchRequests(slot); + } + + return future; + } + + private void submitBatchRequests(final UInt64 slot) { + validatorApiChannel + .getBeaconCommitteeSelectionProof(pendingRequests.keySet().stream().toList()) + .thenAccept( + response -> + response.ifPresentOrElse( + this::handleBeaconCommitteeSelectionProofsResponse, + () -> handleEmptyResponse(slot))) + .exceptionally(unexpectedErrorHandler()) + .ifExceptionGetsHereRaiseABug(); + } + + private void handleBeaconCommitteeSelectionProofsResponse( + final List proofsFromMiddleware) { + LOG.debug("Processing response from middleware for {} requests", proofsFromMiddleware.size()); + + pendingRequests.forEach( + (request, future) -> { + final Optional completeProof = + proofsFromMiddleware.stream().filter(matchingRequest(request)).findFirst(); + + completeProof.ifPresentOrElse( + resp -> { + LOG.debug( + "Completing request for DVT attestation aggregation proof (validator_id={}, slot={})", + request.getValidatorIndex(), + request.getSlot()); + + future.complete(resp.getSelectionProofSignature()); + }, + () -> + future.completeExceptionally( + new RuntimeException( + "No matching complete proof from DVT middleware for this request"))); + }); + } + + private static Predicate matchingRequest( + final BeaconCommitteeSelectionProof request) { + return proof -> + request.getValidatorIndex() == proof.getValidatorIndex() + && request.getSlot().equals(proof.getSlot()); + } + + private void handleEmptyResponse(final UInt64 slot) { + LOG.warn( + "Received empty response from DVT middleware for slot {}. This will impact aggregation duties.", + slot); + completeAllPendingFuturesExceptionally("Empty response from DVT middleware for slot " + slot); + } + + private Function unexpectedErrorHandler() { + return (ex) -> { + final String errorMsg = "Error getting DVT attestation aggregation complete proof"; + LOG.warn(errorMsg); + LOG.debug(errorMsg, ex); + + completeAllPendingFuturesExceptionally(errorMsg); + return null; + }; + } + + private void completeAllPendingFuturesExceptionally(final String errorMsg) { + pendingRequests + .values() + .forEach( + future -> { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException(errorMsg)); + } + }); + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java index 548de4ae84b..34f6938488e 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java @@ -464,7 +464,8 @@ private void scheduleValidatorsDuties( validators, validatorIndexProvider, beaconCommitteeSubscriptions, - spec)); + spec, + config.getValidatorConfig().isDvtSelectionsEndpointEnabled())); final DutyLoader blockDutyLoader = new RetryingDutyLoader<>( asyncRunner, diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java index 584175be0bc..c49e37f5a69 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java @@ -76,7 +76,8 @@ class AttestationDutyLoaderTest { new OwnedValidators(validators), validatorIndexProvider, beaconCommitteeSubscriptions, - spec); + spec, + false); @BeforeEach void setUp() { diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java index 6e119aedfca..8caa0afce6f 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java @@ -753,7 +753,8 @@ private void createDutySchedulerWithRealDuties() { new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)), validatorIndexProvider, beaconCommitteeSubscriptions, - spec); + spec, + false); dutyScheduler = new AttestationDutyScheduler( metricsSystem, new RetryingDutyLoader<>(asyncRunner, attestationDutyLoader), spec); @@ -768,7 +769,8 @@ private void createDutySchedulerWithMockDuties() { new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)), validatorIndexProvider, beaconCommitteeSubscriptions, - spec); + spec, + false); dutyScheduler = new AttestationDutyScheduler( metricsSystem2, new RetryingDutyLoader<>(asyncRunner, attestationDutyLoader), spec); diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/DvtAttestationAggregationsTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/DvtAttestationAggregationsTest.java new file mode 100644 index 00000000000..786f3883f92 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/DvtAttestationAggregationsTest.java @@ -0,0 +1,224 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.bls.BLSSignature; +import tech.pegasys.teku.ethereum.json.types.validator.BeaconCommitteeSelectionProof; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.validator.api.ValidatorApiChannel; + +class DvtAttestationAggregationsTest { + + private final DataStructureUtil dataStructureUtil = + new DataStructureUtil(TestSpecFactory.createDefault()); + private DvtAttestationAggregations loader; + private ValidatorApiChannel validatorApiChannel; + + @BeforeEach + public void setUp() { + validatorApiChannel = mock(ValidatorApiChannel.class); + } + + @Test + public void completesAllFuturesWhenMiddlewareReturnsAllSelectionProofs() { + final BeaconCommitteeSelectionProof combinedProofForValidator1 = combinedProof(1); + final BeaconCommitteeSelectionProof combinedProofForValidator2 = combinedProof(2); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn( + SafeFuture.completedFuture( + Optional.of(List.of(combinedProofForValidator1, combinedProofForValidator2)))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 2); + + final SafeFuture futureSelectionProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator2 = + loader.getCombinedSelectionProofFuture(2, UInt64.ONE, dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidator1) + .isCompletedWithValue(combinedProofForValidator1.getSelectionProofSignature()); + assertThat(futureSelectionProofValidator2) + .isCompletedWithValue(combinedProofForValidator2.getSelectionProofSignature()); + } + + @Test + public void partiallyCompleteFuturesWhenMiddlewareOnlyReturnsSomeSelectionProofs() { + final BeaconCommitteeSelectionProof combinedProofForValidator1 = combinedProof(1); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn(SafeFuture.completedFuture(Optional.of(List.of(combinedProofForValidator1)))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 2); + + final SafeFuture futureSelectionProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator2 = + loader.getCombinedSelectionProofFuture(2, UInt64.ONE, dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidator1) + .isCompletedWithValue(combinedProofForValidator1.getSelectionProofSignature()); + assertThat(futureSelectionProofValidator2).isCompletedExceptionally(); + } + + @Test + public void failAllFuturesIfMiddlewareDoesNotReturnAnyValue() { + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn(SafeFuture.completedFuture(Optional.empty())); + + loader = new DvtAttestationAggregations(validatorApiChannel, 3); + + final SafeFuture futureSelectionProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator2 = + loader.getCombinedSelectionProofFuture( + 2, UInt64.valueOf(2), dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator3 = + loader.getCombinedSelectionProofFuture( + 3, UInt64.valueOf(3), dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidator1).isCompletedExceptionally(); + assertThat(futureSelectionProofValidator2).isCompletedExceptionally(); + assertThat(futureSelectionProofValidator3).isCompletedExceptionally(); + } + + @Test + public void handleDifferentValidatorAggregatingInSameSlot() { + final BeaconCommitteeSelectionProof combinedProofForValidator1 = combinedProofForSlot(1, 1); + final BeaconCommitteeSelectionProof combinedProofForValidator2 = combinedProofForSlot(2, 1); + final BeaconCommitteeSelectionProof combinedProofForValidator3 = combinedProofForSlot(3, 1); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn( + SafeFuture.completedFuture( + Optional.of( + List.of( + combinedProofForValidator1, + combinedProofForValidator2, + combinedProofForValidator3)))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 3); + + final SafeFuture futureSelectionProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator2 = + loader.getCombinedSelectionProofFuture(2, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidator3 = + loader.getCombinedSelectionProofFuture(3, UInt64.ONE, dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidator1) + .isCompletedWithValue(combinedProofForValidator1.getSelectionProofSignature()); + assertThat(futureSelectionProofValidator2) + .isCompletedWithValue(combinedProofForValidator2.getSelectionProofSignature()); + assertThat(futureSelectionProofValidator3) + .isCompletedWithValue(combinedProofForValidator3.getSelectionProofSignature()); + } + + @Test + public void handleSameValidatorAggregatingInDifferentSlots() { + final BeaconCommitteeSelectionProof combinedProofForSlot1 = combinedProofForSlot(1, 1); + final BeaconCommitteeSelectionProof combinedProofForSlot2 = combinedProofForSlot(1, 2); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn( + SafeFuture.completedFuture( + Optional.of(List.of(combinedProofForSlot1, combinedProofForSlot2)))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 2); + + final SafeFuture futureSelectionProofValidatorAtSlot1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureSelectionProofValidatorAtSlot2 = + loader.getCombinedSelectionProofFuture( + 1, UInt64.valueOf(2), dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidatorAtSlot1) + .isCompletedWithValue(combinedProofForSlot1.getSelectionProofSignature()); + assertThat(futureSelectionProofValidatorAtSlot2) + .isCompletedWithValue(combinedProofForSlot2.getSelectionProofSignature()); + } + + @Test + @SuppressWarnings("unchecked") + public void + unexpectedErrorHandlingResponseMustCompleteExceptionallyPendingRequestsWithUnderlyingCause() { + final List mockList = mock(List.class); + // Forcing an unexpected error when handling response + when(mockList.stream()).thenThrow(new RuntimeException("Unexpected error")); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn(SafeFuture.completedFuture(Optional.of(mockList))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 1); + + final SafeFuture futureSelectionProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + + assertThat(futureSelectionProofValidator1) + .isCompletedExceptionally() + .failsWithin(1, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RuntimeException.class) + .withMessageContaining("Error getting DVT attestation aggregation complete proof"); + } + + @Test + public void unexpectedErrorHandlingResponseMustCompleteExceptionallyAllNonCompletedRequests() { + final BeaconCommitteeSelectionProof proofValidator1 = combinedProofForSlot(1, 1); + final BeaconCommitteeSelectionProof proofValidator2 = spy(combinedProofForSlot(2, 1)); + // Forcing an unexpected error while handling the second proof + when(proofValidator2.getValidatorIndex()).thenThrow(new RuntimeException("Unexpected error")); + when(validatorApiChannel.getBeaconCommitteeSelectionProof(any())) + .thenReturn( + SafeFuture.completedFuture(Optional.of(List.of(proofValidator1, proofValidator2)))); + + loader = new DvtAttestationAggregations(validatorApiChannel, 1); + + final SafeFuture futureProofValidator1 = + loader.getCombinedSelectionProofFuture(1, UInt64.ONE, dataStructureUtil.randomSignature()); + final SafeFuture futureProofValidator2 = + loader.getCombinedSelectionProofFuture( + 2, UInt64.valueOf(2), dataStructureUtil.randomSignature()); + + assertThat(futureProofValidator1).isCompleted(); + assertThat(futureProofValidator2).isCompletedExceptionally(); + } + + private BeaconCommitteeSelectionProof combinedProof(final int validatorIndex) { + return new BeaconCommitteeSelectionProof.Builder() + .validatorIndex(validatorIndex) + .slot(UInt64.ONE) + .selectionProof(dataStructureUtil.randomSignature().toBytesCompressed().toHexString()) + .build(); + } + + private BeaconCommitteeSelectionProof combinedProofForSlot( + final int validatorIndex, final int slot) { + return new BeaconCommitteeSelectionProof.Builder() + .validatorIndex(validatorIndex) + .slot(UInt64.valueOf(slot)) + .selectionProof(dataStructureUtil.randomSignature().toBytesCompressed().toHexString()) + .build(); + } +}