Skip to content

Commit

Permalink
Add metric to report the number of peers subscribed to each attestati…
Browse files Browse the repository at this point in the history
…on and sync committee subnet (#6550)
  • Loading branch information
ajsutton authored Dec 5, 2022
1 parent d1e8afd commit 1a249bc
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 37 deletions.
1 change: 1 addition & 0 deletions networking/eth2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
testFixturesImplementation testFixtures(project(':infrastructure:async'))
testFixturesImplementation project(':infrastructure:bytes')
testFixturesImplementation testFixtures(project(':infrastructure:events'))
testFixturesImplementation testFixtures(project(':infrastructure:metrics'))
testFixturesImplementation testFixtures(project(':infrastructure:time'))
testFixturesImplementation testFixtures(project(':storage'))
testFixturesImplementation project(':infrastructure:subscribers')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager;
Expand Down Expand Up @@ -327,6 +329,13 @@ protected DiscoveryNetwork<?> buildNetwork(
discoConfig.getMinRandomlySelectedPeers());
final SchemaDefinitionsSupplier currentSchemaDefinitions =
() -> recentChainData.getCurrentSpec().getSchemaDefinitions();
final SettableLabelledGauge subnetPeerCountGauge =
SettableLabelledGauge.create(
metricsSystem,
TekuMetricCategory.NETWORK,
"subnet_peer_count",
"Number of currently connected peers subscribed to each subnet",
"subnet");
return createDiscoveryNetworkBuilder()
.metricsSystem(metricsSystem)
.asyncRunner(asyncRunner)
Expand All @@ -342,7 +351,8 @@ protected DiscoveryNetwork<?> buildNetwork(
attestationSubnetTopicProvider,
syncCommitteeSubnetTopicProvider,
syncCommitteeSubnetService,
config.getTargetSubnetSubscriberCount()),
config.getTargetSubnetSubscriberCount(),
subnetPeerCountGauge),
reputationManager,
Collections::shuffle))
.discoveryConfig(discoConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.infrastructure.ssz.schema.collections.SszBitvectorSchema;
import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService;
Expand Down Expand Up @@ -55,49 +56,79 @@ public static PeerSubnetSubscriptions create(
final AttestationSubnetTopicProvider attestationTopicProvider,
final SyncCommitteeSubnetTopicProvider syncCommitteeSubnetTopicProvider,
final SubnetSubscriptionService syncCommitteeSubnetService,
final int targetSubnetSubscriberCount) {
final int targetSubnetSubscriberCount,
final SettableLabelledGauge subnetPeerCountGauge) {
final Map<String, Collection<NodeId>> subscribersByTopic = network.getSubscribersByTopic();

return builder(currentSchemaDefinitions)
.targetSubnetSubscriberCount(targetSubnetSubscriberCount)
.attestationSubnetSubscriptions(
b ->
// Track all attestation subnets
streamAllAttestationSubnetIds(currentSchemaDefinitions)
.forEach(
attestationSubnet -> {
b.addRelevantSubnet(attestationSubnet);
subscribersByTopic
.getOrDefault(
attestationTopicProvider.getTopicForSubnet(attestationSubnet),
Collections.emptySet())
.forEach(
subscriber -> b.addSubscriber(attestationSubnet, subscriber));
}))
.syncCommitteeSubnetSubscriptions(
b ->
// Only track sync committee subnets that we're subscribed to
syncCommitteeSubnetService
.getSubnets()
.forEach(
syncCommitteeSubnet -> {
b.addRelevantSubnet(syncCommitteeSubnet);
subscribersByTopic
.getOrDefault(
syncCommitteeSubnetTopicProvider.getTopicForSubnet(
syncCommitteeSubnet),
Collections.emptySet())
.forEach(
subscriber -> b.addSubscriber(syncCommitteeSubnet, subscriber));
}))
.build();
final PeerSubnetSubscriptions subscriptions =
builder(currentSchemaDefinitions)
.targetSubnetSubscriberCount(targetSubnetSubscriberCount)
.attestationSubnetSubscriptions(
b ->
// Track all attestation subnets
streamAllAttestationSubnetIds(currentSchemaDefinitions)
.forEach(
attestationSubnet -> {
b.addRelevantSubnet(attestationSubnet);
subscribersByTopic
.getOrDefault(
attestationTopicProvider.getTopicForSubnet(attestationSubnet),
Collections.emptySet())
.forEach(
subscriber -> b.addSubscriber(attestationSubnet, subscriber));
}))
.syncCommitteeSubnetSubscriptions(
b ->
// Only track sync committee subnets that we're subscribed to
syncCommitteeSubnetService
.getSubnets()
.forEach(
syncCommitteeSubnet -> {
b.addRelevantSubnet(syncCommitteeSubnet);
subscribersByTopic
.getOrDefault(
syncCommitteeSubnetTopicProvider.getTopicForSubnet(
syncCommitteeSubnet),
Collections.emptySet())
.forEach(
subscriber ->
b.addSubscriber(syncCommitteeSubnet, subscriber));
}))
.build();
updateMetrics(currentSchemaDefinitions, subnetPeerCountGauge, subscriptions);
return subscriptions;
}

private static void updateMetrics(
final SchemaDefinitionsSupplier currentSchemaDefinitions,
final SettableLabelledGauge subnetPeerCountGauge,
final PeerSubnetSubscriptions subscriptions) {
streamAllAttestationSubnetIds(currentSchemaDefinitions)
.forEach(
subnetId ->
subnetPeerCountGauge.set(
subscriptions.attestationSubnetSubscriptions.subscriberCountBySubnetId
.getOrDefault(subnetId, 0),
"attestation_" + subnetId));
streamAllSyncCommitteeSubnetIds(currentSchemaDefinitions)
.forEach(
subnetId ->
subnetPeerCountGauge.set(
subscriptions.syncCommitteeSubnetSubscriptions.subscriberCountBySubnetId
.getOrDefault(subnetId, 0),
"sync_committee_" + subnetId));
}

private static IntStream streamAllAttestationSubnetIds(
final SchemaDefinitionsSupplier currentSchemaDefinitions) {
return IntStream.range(0, currentSchemaDefinitions.getAttnetsENRFieldSchema().getLength());
}

private static IntStream streamAllSyncCommitteeSubnetIds(
final SchemaDefinitionsSupplier currentSchemaDefinitions) {
return IntStream.range(0, currentSchemaDefinitions.getSyncnetsENRFieldSchema().getLength());
}

static Builder builder(final SchemaDefinitionsSupplier currentSchemaDefinitions) {
return new Builder(currentSchemaDefinitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService;
import tech.pegasys.teku.networking.p2p.gossip.GossipNetwork;
Expand All @@ -46,6 +47,7 @@ class PeerSubnetSubscriptionsTest {
private static final int TARGET_SUBSCRIBER_COUNT = 2;

private final Spec spec = TestSpecFactory.createMinimalAltair();
private final SettableLabelledGauge subnetPeerCountGauge = mock(SettableLabelledGauge.class);
private final SchemaDefinitionsSupplier currentSchemaDefinitions =
spec::getGenesisSchemaDefinitions;
private final GossipNetwork gossipNetwork = mock(GossipNetwork.class);
Expand Down Expand Up @@ -201,7 +203,8 @@ private PeerSubnetSubscriptions createPeerSubnetSubscriptions() {
attestationTopicProvider,
syncCommitteeTopicProvider,
syncnetSubscriptions,
TARGET_SUBSCRIBER_COUNT);
TARGET_SUBSCRIBER_COUNT,
subnetPeerCountGauge);
}

private void withSubscriberCountForAllSubnets(int subscriberCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.Waiter;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -228,6 +230,13 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
discoConfig.getMinRandomlySelectedPeers());
final SchemaDefinitionsSupplier currentSchemaDefinitions =
() -> config.getSpec().getGenesisSchemaDefinitions();
final SettableLabelledGauge subnetPeerCountGauge =
SettableLabelledGauge.create(
metricsSystem,
TekuMetricCategory.NETWORK,
"subnet_peer_count",
"Number of currently connected peers subscribed to each subnet",
"subnet");
final DiscoveryNetwork<?> network =
DiscoveryNetworkBuilder.create()
.metricsSystem(metricsSystem)
Expand Down Expand Up @@ -257,7 +266,8 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
attestationSubnetTopicProvider,
syncCommitteeTopicProvider,
syncCommitteeSubnetService,
config.getTargetSubnetSubscriberCount()),
config.getTargetSubnetSubscriberCount(),
subnetPeerCountGauge),
reputationManager,
Collections::shuffle))
.discoveryConfig(config.getDiscoveryConfig())
Expand Down

0 comments on commit 1a249bc

Please sign in to comment.