Skip to content

Commit

Permalink
spin up agents monitoring in all agents
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Feb 8, 2024
1 parent 37478ef commit 5f42f98
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 40 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 8 additions & 19 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use hyperlane_base::{
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -281,26 +280,16 @@ impl BaseAgent for Relayer {

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let provider = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let metrics_updater = MetricsUpdater::new(
dest_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
agent_metrics_conf,
provider,
);

let metrics_updater_task = tokio::spawn(async move {
metrics_updater
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("MetricsUpdater"));
tasks.push(metrics_updater_task);
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}

for origin in &self.origin_chains {
Expand Down
1 change: 1 addition & 0 deletions rust/agents/scraper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ eyre.workspace = true
futures.workspace = true
itertools.workspace = true
num-bigint.workspace = true
num-traits.workspace = true
prometheus.workspace = true
sea-orm = { workspace = true }
serde.workspace = true
Expand Down
38 changes: 29 additions & 9 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ChainMetrics,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
};
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain};
use num_traits::cast::FromPrimitive;
use tokio::task::JoinHandle;
use tracing::{info_span, instrument::Instrumented, trace, Instrument};

Expand All @@ -19,8 +20,11 @@ pub struct Scraper {
#[as_ref]
core: HyperlaneAgentCore,
contract_sync_metrics: Arc<ContractSyncMetrics>,
metrics: Arc<CoreMetrics>,
scrapers: HashMap<u32, ChainScraper>,
settings: ScraperSettings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}

#[derive(Debug)]
Expand All @@ -38,8 +42,8 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
_chain_metrics: ChainMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
Expand Down Expand Up @@ -77,9 +81,12 @@ impl BaseAgent for Scraper {

Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
settings,
core_metrics: metrics,
agent_metrics,
chain_metrics,
})
}

Expand All @@ -88,6 +95,19 @@ impl BaseAgent for Scraper {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);

let domain = KnownHyperlaneDomain::from_u32(*domain).unwrap();
let chain_conf = self.settings.chain_setup(&domain.into()).unwrap();
let metrics_updater = MetricsUpdater::new(
chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}
run_all(tasks)
}
Expand All @@ -106,7 +126,7 @@ impl Scraper {
tasks.push(
self.build_message_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
Expand All @@ -116,7 +136,7 @@ impl Scraper {
tasks.push(
self.build_delivery_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
Expand All @@ -126,7 +146,7 @@ impl Scraper {
tasks.push(
self.build_interchain_gas_payment_indexer(
domain,
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db,
index_settings.clone(),
Expand Down
35 changes: 31 additions & 4 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore, SequencedDataContractSync,
run_all,
settings::ChainConf,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore, MetricsUpdater, SequencedDataContractSync,
};

use hyperlane_core::{
Expand All @@ -31,6 +33,7 @@ use crate::{
#[derive(Debug, AsRef)]
pub struct Validator {
origin_chain: HyperlaneDomain,
origin_chain_conf: ChainConf,
#[as_ref]
core: HyperlaneAgentCore,
db: HyperlaneRocksDB,
Expand All @@ -44,6 +47,9 @@ pub struct Validator {
reorg_period: u64,
interval: Duration,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}

#[async_trait]
Expand All @@ -55,8 +61,8 @@ impl BaseAgent for Validator {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
_chain_metrics: ChainMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized,
Expand All @@ -82,6 +88,12 @@ impl BaseAgent for Validator {
.build_validator_announce(&settings.origin_chain, &metrics)
.await?;

let origin_chain_conf = core
.settings
.chain_setup(&settings.origin_chain)
.unwrap()
.clone();

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));

let merkle_tree_hook_sync = settings
Expand All @@ -96,6 +108,7 @@ impl BaseAgent for Validator {

Ok(Self {
origin_chain: settings.origin_chain,
origin_chain_conf,
core,
db: msg_db,
mailbox: mailbox.into(),
Expand All @@ -107,6 +120,9 @@ impl BaseAgent for Validator {
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
agent_metrics,
chain_metrics,
core_metrics: metrics,
})
}

Expand All @@ -124,6 +140,17 @@ impl BaseAgent for Validator {
);
}

let metrics_updater = MetricsUpdater::new(
&self.origin_chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());

// announce the validator after spawning the signer task
self.announce().await.expect("Failed to announce validator");

Expand Down
46 changes: 38 additions & 8 deletions rust/hyperlane-base/src/metrics/agent_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
//! Metrics either related to the agents, or observed by them
use std::sync::Arc;
use std::time::Duration;

use derive_builder::Builder;
use derive_new::new;
use eyre::Result;
use eyre::{Report, Result};
use hyperlane_core::metrics::agent::decimals_by_protocol;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::HyperlaneProvider;
use maplit::hashmap;
use prometheus::GaugeVec;
use prometheus::IntGaugeVec;
use tokio::time::MissedTickBehavior;
use tracing::debug;
use tracing::{trace, warn};
use tokio::{task::JoinHandle, time::MissedTickBehavior};
use tracing::info_span;
use tracing::{debug, instrument::Instrumented, trace, warn, Instrument};

use crate::settings::ChainConf;
use crate::CoreMetrics;

/// Expected label names for the `wallet_balance` metric.
Expand Down Expand Up @@ -43,7 +45,7 @@ pub const GAS_PRICE_HELP: &str =
"Tracks the current gas price of the chain, in the lowest denomination (e.g. gwei)";

/// Agent-specific metrics
#[derive(Clone, Builder)]
#[derive(Clone, Builder, Debug)]
pub struct AgentMetrics {
/// Current balance of native tokens for the
/// wallet address.
Expand All @@ -69,7 +71,7 @@ pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result<AgentMetrics
}

/// Chain-specific metrics
#[derive(Clone, Builder)]
#[derive(Clone, Builder, Debug)]
pub struct ChainMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain
Expand Down Expand Up @@ -114,7 +116,6 @@ pub struct AgentMetricsConf {
}

/// Utility struct to update various metrics using a standalone tokio task
#[derive(new)]
pub struct MetricsUpdater {
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
Expand All @@ -123,6 +124,25 @@ pub struct MetricsUpdater {
}

impl MetricsUpdater {
/// Creates a new instance of the `MetricsUpdater`
pub async fn new(
chain_conf: &ChainConf,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
agent_name: String,
) -> Result<Self> {
let agent_metrics_conf = chain_conf.agent_metrics_conf(agent_name).await?;
let provider = chain_conf.build_provider(&core_metrics).await?;

Ok(Self {
agent_metrics,
chain_metrics,
conf: agent_metrics_conf,
provider,
})
}

async fn update_agent_metrics(&self) {
let Some(wallet_addr) = self.conf.address.clone() else {
return;
Expand Down Expand Up @@ -202,4 +222,14 @@ impl MetricsUpdater {
interval.tick().await;
}
}

/// Spawns a tokio task to update the metrics
pub fn spawn(self) -> Instrumented<JoinHandle<Result<(), Report>>> {
tokio::spawn(async move {
self.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("MetricsUpdater"))
}
}
6 changes: 6 additions & 0 deletions rust/hyperlane-core/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ impl Debug for HyperlaneDomain {
}
}

impl From<KnownHyperlaneDomain> for HyperlaneDomain {
fn from(domain: KnownHyperlaneDomain) -> Self {
HyperlaneDomain::Known(domain)
}
}

#[derive(thiserror::Error, Debug)]
pub enum HyperlaneDomainConfigError {
#[error("Domain name (`{0}`) does not match the name of a known domain id; the name is probably misspelled.")]
Expand Down

0 comments on commit 5f42f98

Please sign in to comment.