Skip to content

Commit

Permalink
feat: fetch chain-specific metrics in standalone task (#3214)
Browse files Browse the repository at this point in the history
### Description

Spawning a tokio task to fetch chain-specific metrics in provider
middleware was wasteful because we ended up with 15 tasks doing the same
job. Now there is a single task that is spawned inside the relayer,
which I added in the struct that used to only fetch relayer balance.

Some questions:
- I'm realising in the current state I'm probably removing these metrics
from validators. Lmk if yes and I'll add back
- the chain-specific metrics were moved out of `MiddlewareMetrics`
because they're no longer part of middleware, but I wasn't sure where to
place them. Maybe `CoreMetrics` is a better place than the current
`custom_metrics.rs` file?

`tokio-metrics` turned out not to be useful because we'd need to
instrument every call site of `tokio::spawn` with it. We should still do
it but as a separate task imo.

### Drive-by changes

- This PR also makes it very easy to add the metrics tasks for new
chains, by extending the `HyperlaneProvider` trait with a
`get_chain_metrics` call which seems reasonably general to me to have
implemented by all providers. (Bc we currently only track these for evm
chains)
- the description, naming and logic of the `gas_price` metric was also
changed to support new chains

### Related issues

- Fixes #3047

### Backward compatibility

Yes

### Testing

Still need to manually test
  • Loading branch information
daniel-savu authored and yorhodes committed Feb 12, 2024
1 parent be0b323 commit a76694c
Show file tree
Hide file tree
Showing 19 changed files with 302 additions and 217 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 18 additions & 23 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, AgentMetricsUpdater},
metrics::{AgentMetrics, MetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
BaseAgent, ChainMetrics, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -72,7 +71,10 @@ pub struct Relayer {
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
// TODO: decide whether to consolidate `agent_metrics` and `chain_metrics` into a single struct
// or move them in `core_metrics`, like the validator metrics
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}

impl Debug for Relayer {
Expand Down Expand Up @@ -102,6 +104,7 @@ impl BaseAgent for Relayer {
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized,
Expand Down Expand Up @@ -260,6 +263,7 @@ impl BaseAgent for Relayer {
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
chain_metrics,
})
}

Expand All @@ -276,25 +280,16 @@ impl BaseAgent for Relayer {

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
let metrics_updater = MetricsUpdater::new(
dest_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}

for origin in &self.origin_chains {
Expand Down
1 change: 1 addition & 0 deletions rust/agents/scraper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ eyre.workspace = true
futures.workspace = true
itertools.workspace = true
num-bigint.workspace = true
num-traits.workspace = true
prometheus.workspace = true
sea-orm = { workspace = true }
serde.workspace = true
Expand Down
39 changes: 30 additions & 9 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ChainMetrics,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
};
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain};
use num_traits::cast::FromPrimitive;
use tokio::task::JoinHandle;
use tracing::{info_span, instrument::Instrumented, trace, Instrument};

Expand All @@ -19,8 +20,11 @@ pub struct Scraper {
#[as_ref]
core: HyperlaneAgentCore,
contract_sync_metrics: Arc<ContractSyncMetrics>,
metrics: Arc<CoreMetrics>,
scrapers: HashMap<u32, ChainScraper>,
settings: ScraperSettings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}

#[derive(Debug)]
Expand All @@ -38,7 +42,8 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
Expand Down Expand Up @@ -76,9 +81,12 @@ impl BaseAgent for Scraper {

Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
settings,
core_metrics: metrics,
agent_metrics,
chain_metrics,
})
}

Expand All @@ -87,6 +95,19 @@ impl BaseAgent for Scraper {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);

let domain = KnownHyperlaneDomain::from_u32(*domain).unwrap();
let chain_conf = self.settings.chain_setup(&domain.into()).unwrap();
let metrics_updater = MetricsUpdater::new(
chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}
run_all(tasks)
}
Expand All @@ -105,7 +126,7 @@ impl Scraper {
tasks.push(
self.build_message_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
Expand All @@ -115,7 +136,7 @@ impl Scraper {
tasks.push(
self.build_delivery_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
Expand All @@ -125,7 +146,7 @@ impl Scraper {
tasks.push(
self.build_interchain_gas_payment_indexer(
domain,
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db,
index_settings.clone(),
Expand Down
34 changes: 31 additions & 3 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
run_all,
settings::ChainConf,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore, MetricsUpdater, SequencedDataContractSync,
};

use hyperlane_core::{
Expand All @@ -31,6 +33,7 @@ use crate::{
#[derive(Debug, AsRef)]
pub struct Validator {
origin_chain: HyperlaneDomain,
origin_chain_conf: ChainConf,
#[as_ref]
core: HyperlaneAgentCore,
db: HyperlaneRocksDB,
Expand All @@ -44,6 +47,9 @@ pub struct Validator {
reorg_period: u64,
interval: Duration,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}

#[async_trait]
Expand All @@ -55,7 +61,8 @@ impl BaseAgent for Validator {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized,
Expand All @@ -81,6 +88,12 @@ impl BaseAgent for Validator {
.build_validator_announce(&settings.origin_chain, &metrics)
.await?;

let origin_chain_conf = core
.settings
.chain_setup(&settings.origin_chain)
.unwrap()
.clone();

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));

let merkle_tree_hook_sync = settings
Expand All @@ -95,6 +108,7 @@ impl BaseAgent for Validator {

Ok(Self {
origin_chain: settings.origin_chain,
origin_chain_conf,
core,
db: msg_db,
mailbox: mailbox.into(),
Expand All @@ -106,6 +120,9 @@ impl BaseAgent for Validator {
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
agent_metrics,
chain_metrics,
core_metrics: metrics,
})
}

Expand All @@ -123,6 +140,17 @@ impl BaseAgent for Validator {
);
}

let metrics_updater = MetricsUpdater::new(
&self.origin_chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());

// announce the validator after spawning the signer task
self.announce().await.expect("Failed to announce validator");

Expand Down
8 changes: 6 additions & 2 deletions rust/chains/hyperlane-cosmos/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider,
TxnInfo, H256, U256,
BlockInfo, ChainInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain,
HyperlaneProvider, TxnInfo, H256, U256,
};
use tendermint_rpc::{client::CompatMode, HttpClient};

Expand Down Expand Up @@ -99,4 +99,8 @@ impl HyperlaneProvider for CosmosProvider {
.get_balance(address, self.canonical_asset.clone())
.await?)
}

async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
Ok(None)
}
}
4 changes: 4 additions & 0 deletions rust/chains/hyperlane-ethereum/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub enum HyperlaneEthereumError {
/// provider Error
#[error("{0}")]
ProviderError(#[from] ProviderError),

/// Some details from a queried block are missing
#[error("Some details from a queried block are missing")]
MissingBlockDetails,
}

impl From<HyperlaneEthereumError> for ChainCommunicationError {
Expand Down
36 changes: 30 additions & 6 deletions rust/chains/hyperlane-ethereum/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::time::Duration;
use async_trait::async_trait;
use derive_new::new;
use ethers::prelude::Middleware;
use ethers_core::abi::Address;
use hyperlane_core::{ethers_core_types, U256};
use ethers_core::{abi::Address, types::BlockNumber};
use hyperlane_core::{ethers_core_types, ChainInfo, HyperlaneCustomErrorWrapper, U256};
use tokio::time::sleep;
use tracing::instrument;

Expand All @@ -21,10 +21,7 @@ use crate::BuildableWithProvider;
/// Connection to an ethereum provider. Useful for querying information about
/// the blockchain.
#[derive(Debug, Clone, new)]
pub struct EthereumProvider<M>
where
M: Middleware,
{
pub struct EthereumProvider<M> {
provider: Arc<M>,
domain: HyperlaneDomain,
}
Expand Down Expand Up @@ -119,6 +116,33 @@ where
.map_err(ChainCommunicationError::from_other)?;
Ok(balance.into())
}

async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
let Some(block) = self
.provider
.get_block(BlockNumber::Latest)
.await
.map_err(|e| {
ChainCommunicationError::Other(HyperlaneCustomErrorWrapper::new(Box::new(e)))
})?
else {
return Ok(None);
};

// Given the block is queried with `BlockNumber::Latest` rather than `BlockNumber::Pending`,
// if `block` is Some at this point, we're guaranteed to have its `hash` and `number` defined,
// so it's safe to unwrap below
// more info at <https://docs.rs/ethers/latest/ethers/core/types/struct.Block.html#structfield.number>
let chain_metrics = ChainInfo::new(
BlockInfo {
hash: block.hash.unwrap().into(),
timestamp: block.timestamp.as_u64(),
number: block.number.unwrap().as_u64(),
},
block.base_fee_per_gas.map(Into::into),
);
Ok(Some(chain_metrics))
}
}

impl<M> EthereumProvider<M>
Expand Down
Loading

0 comments on commit a76694c

Please sign in to comment.