Skip to content

Commit

Permalink
Refactoring logic into ValidatorDutyMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyeh committed Oct 13, 2023
1 parent 5bfd50f commit 33b88be
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,7 +54,10 @@
import tech.pegasys.teku.validator.client.doppelganger.DoppelgangerDetector;
import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions;
import tech.pegasys.teku.validator.client.duties.BlockDutyFactory;
import tech.pegasys.teku.validator.client.duties.Duty;
import tech.pegasys.teku.validator.client.duties.DutyResult;
import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties;
import tech.pegasys.teku.validator.client.duties.ValidatorDutyMetrics;
import tech.pegasys.teku.validator.client.duties.attestations.AttestationDutyFactory;
import tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadTracker;
import tech.pegasys.teku.validator.client.duties.synccommittee.SyncCommitteeScheduledDuties;
Expand Down Expand Up @@ -419,6 +423,15 @@ private void scheduleValidatorsDuties(
new AttestationDutyFactory(spec, forkProvider, validatorApiChannel);
final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions =
new BeaconCommitteeSubscriptions(validatorApiChannel);

final Function<Duty, SafeFuture<DutyResult>> dutyFunction;
if (metricsOn) {
dutyFunction = Duty::performDuty;
} else {
final ValidatorDutyMetrics metrics = ValidatorDutyMetrics.create(metricsSystem);
dutyFunction = metrics::performDutyWithMetrics;
}

final DutyLoader<?> attestationDutyLoader =
new RetryingDutyLoader<>(
asyncRunner,
Expand All @@ -427,7 +440,7 @@ private void scheduleValidatorsDuties(
forkProvider,
dependentRoot ->
new SlotBasedScheduledDuties<>(
attestationDutyFactory, dependentRoot, metricsSystem, metricsOn),
attestationDutyFactory, dependentRoot, dutyFunction),
validators,
validatorIndexProvider,
beaconCommitteeSubscriptions,
Expand All @@ -438,8 +451,7 @@ private void scheduleValidatorsDuties(
new BlockProductionDutyLoader(
validatorApiChannel,
dependentRoot ->
new SlotBasedScheduledDuties<>(
blockDutyFactory, dependentRoot, metricsSystem, metricsOn),
new SlotBasedScheduledDuties<>(blockDutyFactory, dependentRoot, dutyFunction),
validators,
validatorIndexProvider));
validatorTimingChannels.add(new BlockDutyScheduler(metricsSystem, blockDutyLoader, spec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package tech.pegasys.teku.validator.client.duties;

public enum DutyType {
ATTESTATION_AGGREGATION("attestation aggregation"),
ATTESTATION_PRODUCTION("attestation production"),
BLOCK_PRODUCTION("block production");
ATTESTATION_AGGREGATION("attestation_aggregation"),
ATTESTATION_PRODUCTION("attestation_production"),
BLOCK_PRODUCTION("block_production");

private final String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.validator.client.Validator;

Expand All @@ -34,36 +30,15 @@ public class SlotBasedScheduledDuties<P extends Duty, A extends Duty> implements
private final DutyFactory<P, A> dutyFactory;
private final Bytes32 dependentRoot;

private final OperationTimer attestationProductionDutyTimer;
private final OperationTimer blockProductionDutyTimer;
private final OperationTimer attestationAggregationDutyTimer;

private final boolean metricsOn;
private final Function<Duty, SafeFuture<DutyResult>> dutyFunction;

public SlotBasedScheduledDuties(
final DutyFactory<P, A> dutyFactory,
final Bytes32 dependentRoot,
final MetricsSystem metricsSystem,
final boolean metricsOn) {
final Function<Duty, SafeFuture<DutyResult>> dutyFunction) {
this.dutyFactory = dutyFactory;
this.dependentRoot = dependentRoot;
this.metricsOn = metricsOn;

this.attestationProductionDutyTimer =
metricsSystem.createTimer(
TekuMetricCategory.VALIDATOR,
"attestation_duty_timer",
"Timer recording the time taken to perform an attestation duty");
this.blockProductionDutyTimer =
metricsSystem.createTimer(
TekuMetricCategory.VALIDATOR,
"block_duty_timer",
"Timer recording the time taken to perform a block duty");
this.attestationAggregationDutyTimer =
metricsSystem.createTimer(
TekuMetricCategory.VALIDATOR,
"aggregation_duty_timer",
"Timer recording the time taken to perform an aggregation duty");
this.dutyFunction = dutyFunction;
}

public Bytes32 getDependentRoot() {
Expand Down Expand Up @@ -119,30 +94,7 @@ private SafeFuture<DutyResult> performDutyForSlot(
return SafeFuture.completedFuture(DutyResult.NO_OP);
}

return metricsOn ? performDutyWithMetrics(duty) : duty.performDuty();
}

private OperationTimer getMetricTimer(final DutyType type) {
if (type.equals(DutyType.ATTESTATION_AGGREGATION)) {
return attestationAggregationDutyTimer;
} else if (type.equals(DutyType.ATTESTATION_PRODUCTION)) {
return attestationProductionDutyTimer;
} else if (type.equals(DutyType.BLOCK_PRODUCTION)) {
return blockProductionDutyTimer;
} else {
throw new InvalidConfigurationException(type.getType() + " is an invalid duty type");
}
}

private SafeFuture<DutyResult> performDutyWithMetrics(final Duty duty) {
final OperationTimer timer = getMetricTimer(duty.getType());
final OperationTimer.TimingContext context = timer.startTimer();
return duty.performDuty()
.thenApply(
result -> {
context.stopTimer();
return result;
});
return dutyFunction.apply(duty);
}

private void discardDutiesBeforeSlot(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.duties;

import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;

public class ValidatorDutyMetrics {
private final LabelledMetric<OperationTimer> dutyMetric;

ValidatorDutyMetrics(final LabelledMetric<OperationTimer> dutyMetric) {
this.dutyMetric = dutyMetric;
}

public static ValidatorDutyMetrics create(final MetricsSystem metricsSystem) {
final LabelledMetric<OperationTimer> dutyMetric =
metricsSystem.createLabelledTimer(
TekuMetricCategory.VALIDATOR,
"duty_timer",
"Timer recording the time taken to perform a duty",
"type",
"step");
return new ValidatorDutyMetrics(dutyMetric);
}

public SafeFuture<DutyResult> performDutyWithMetrics(final Duty duty) {
final String dutyType = duty.getType().getType();
final OperationTimer timer = dutyMetric.labels(dutyType, "total");
final OperationTimer.TimingContext context = timer.startTimer();
return duty.performDuty()
.thenApply(
result -> {
context.stopTimer();
return result;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.pegasys.teku.validator.api.AttesterDuties;
import tech.pegasys.teku.validator.api.AttesterDuty;
import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions;
import tech.pegasys.teku.validator.client.duties.Duty;
import tech.pegasys.teku.validator.client.duties.DutyResult;
import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties;
import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty;
Expand Down Expand Up @@ -748,7 +749,7 @@ private void createDutySchedulerWithRealDuties() {
forkProvider,
dependentRoot ->
new SlotBasedScheduledDuties<>(
attestationDutyFactory, dependentRoot, metricsSystem, false),
attestationDutyFactory, dependentRoot, Duty::performDuty),
new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)),
validatorIndexProvider,
beaconCommitteeSubscriptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void createDutySchedulerWithRealDuties() {
validatorApiChannel,
dependentRoot ->
new SlotBasedScheduledDuties<>(
blockDutyFactory, dependentRoot, metricsSystem, false),
blockDutyFactory, dependentRoot, Duty::performDuty),
new OwnedValidators(
Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)),
validatorIndexProvider)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SlotBasedScheduledDutiesTest {

private final SlotBasedScheduledDuties<ProductionDuty, AggregationDuty> duties =
new SlotBasedScheduledDuties<>(
dutyFactory, Bytes32.fromHexString("0x838382"), metricsSystem, false);
dutyFactory, Bytes32.fromHexString("0x838382"), Duty::performDuty);

@Test
public void shouldDiscardMissedProductionDuties() {
Expand Down

0 comments on commit 33b88be

Please sign in to comment.