Skip to content

Commit

Permalink
More metrics for L1 client
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Jan 3, 2025
1 parent 03b2c6a commit 42c4056
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
41 changes: 35 additions & 6 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{
future::{Future, FutureExt},
stream::{self, StreamExt},
};
use hotshot_types::traits::metrics::Metrics;
use hotshot_types::traits::metrics::{CounterFamily, Metrics};
use lru::LruCache;
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -34,7 +34,7 @@ use url::Url;
use super::{
L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus,
};
use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Provider, L1Snapshot};

impl PartialOrd for L1BlockInfo {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand Down Expand Up @@ -121,6 +121,16 @@ impl L1ClientMetrics {
reconnects: metrics
.create_counter("stream_reconnects".into(), None)
.into(),
failovers: metrics.create_counter("failovers".into(), None).into(),
}
}
}

impl L1Provider {
fn new(url: Url, failures: &dyn CounterFamily) -> Self {
Self {
failures: failures.create(vec![url.to_string()]),
inner: Http::new(url),
}
}
}
Expand All @@ -134,12 +144,23 @@ impl MultiRpcClient {
failover_send.set_await_active(false);
failover_send.set_overflow(true);

let metrics = L1ClientMetrics::new(&**opt.metrics);
let failures = opt
.metrics
.counter_family("failed_requests".into(), vec!["provider".into()]);

Self {
clients: Arc::new(clients.into_iter().map(Http::new).collect()),
clients: Arc::new(
clients
.into_iter()
.map(|url| L1Provider::new(url, &*failures))
.collect(),
),
status: Default::default(),
failover_send,
failover_recv: failover_recv.deactivate(),
opt,
metrics,
}
}

Expand All @@ -156,6 +177,7 @@ impl MultiRpcClient {
status.rate_limited_until = None;
status.last_failure = None;
status.consecutive_failures = 0;
self.metrics.failovers.add(1);
self.failover_send.broadcast_direct(()).await.ok();
}

Expand All @@ -167,6 +189,10 @@ impl MultiRpcClient {
fn options(&self) -> &L1ClientOptions {
&self.opt
}

fn metrics(&self) -> &L1ClientMetrics {
&self.metrics
}
}

#[async_trait]
Expand Down Expand Up @@ -202,6 +228,7 @@ impl JsonRpcClient for MultiRpcClient {
Err(err) => {
let t = Instant::now();
tracing::warn!(?t, method, ?params, "L1 client error: {err:#}");
client.failures.add(1);

// Keep track of failures, failing over to the next client if necessary.
let mut status = self.status.write().await;
Expand Down Expand Up @@ -250,7 +277,6 @@ impl JsonRpcClient for MultiRpcClient {
impl L1Client {
fn with_provider(mut provider: Provider<MultiRpcClient>) -> Self {
let opt = provider.as_ref().options().clone();
let metrics = L1ClientMetrics::new(&**opt.metrics);

let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity);
receiver.set_await_active(false);
Expand All @@ -263,7 +289,6 @@ impl L1Client {
sender,
receiver: receiver.deactivate(),
update_task: Default::default(),
metrics,
}
}

Expand Down Expand Up @@ -302,7 +327,7 @@ impl L1Client {
let subscription_timeout = opt.subscription_timeout;
let state = self.state.clone();
let sender = self.sender.clone();
let metrics = self.metrics.clone();
let metrics = self.metrics().clone();

let span = tracing::warn_span!("L1 client update");
async move {
Expand Down Expand Up @@ -716,6 +741,10 @@ impl L1Client {
(*self.provider).as_ref().options()
}

fn metrics(&self) -> &L1ClientMetrics {
(*self.provider).as_ref().metrics()
}

async fn retry_delay(&self) {
sleep(self.options().l1_retry_delay).await;
}
Expand Down
3 changes: 2 additions & 1 deletion types/src/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ reexport_unchanged_types!(
);

pub(crate) use v0_3::{
L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus,
L1ClientMetrics, L1Event, L1Provider, L1State, L1UpdateTask, MultiRpcClient,
MultiRpcClientStatus,
};

#[derive(
Expand Down
15 changes: 12 additions & 3 deletions types/src/v0/v0_1/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::parse_duration;
use async_broadcast::{InactiveReceiver, Sender};
use async_lock::RwLock;
use clap::Parser;
use derive_more::Deref;
use ethers::{
prelude::{H256, U256},
providers::{Http, Provider},
Expand Down Expand Up @@ -156,8 +157,6 @@ pub struct L1Client {
pub(crate) receiver: InactiveReceiver<L1Event>,
/// Async task which updates the shared state.
pub(crate) update_task: Arc<L1UpdateTask>,
/// Metrics
pub(crate) metrics: L1ClientMetrics,
}

/// In-memory view of the L1 state, updated asynchronously.
Expand All @@ -181,6 +180,7 @@ pub(crate) struct L1ClientMetrics {
pub(crate) head: Arc<dyn Gauge>,
pub(crate) finalized: Arc<dyn Gauge>,
pub(crate) reconnects: Arc<dyn Counter>,
pub(crate) failovers: Arc<dyn Counter>,
}

/// An RPC client with multiple remote providers.
Expand All @@ -189,11 +189,12 @@ pub(crate) struct L1ClientMetrics {
/// failing state, it will automatically switch to the next provider in its list.
#[derive(Clone, Debug)]
pub(crate) struct MultiRpcClient {
pub(crate) clients: Arc<Vec<Http>>,
pub(crate) clients: Arc<Vec<L1Provider>>,
pub(crate) status: Arc<RwLock<MultiRpcClientStatus>>,
pub(crate) failover_send: Sender<()>,
pub(crate) failover_recv: InactiveReceiver<()>,
pub(crate) opt: L1ClientOptions,
pub(crate) metrics: L1ClientMetrics,
}

/// The state of the current provider being used by a [`MultiRpcClient`].
Expand All @@ -204,3 +205,11 @@ pub(crate) struct MultiRpcClientStatus {
pub(crate) consecutive_failures: usize,
pub(crate) rate_limited_until: Option<Instant>,
}

/// A single provider in a [`MultiRpcClient`].
#[derive(Debug, Deref)]
pub(crate) struct L1Provider {
#[deref]
pub(crate) inner: Http,
pub(crate) failures: Box<dyn Counter>,
}
3 changes: 2 additions & 1 deletion types/src/v0/v0_3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub use super::v0_1::{
NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN,
};
pub(crate) use super::v0_1::{
L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus,
L1ClientMetrics, L1Event, L1Provider, L1State, L1UpdateTask, MultiRpcClient,
MultiRpcClientStatus,
};

pub const VERSION: Version = Version { major: 0, minor: 3 };
Expand Down

0 comments on commit 42c4056

Please sign in to comment.