From abf9a9b7122a6a178b6c5d484854723102ac9912 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Wed, 4 Dec 2024 08:01:02 -0800 Subject: [PATCH] Add L1 client metrics (#2346) * Add L1 client metrics * Log L1 RPC errors at RPC client layer This ensures all errors get logged straight from the RPC, even if they are ultimately swallowed at a higher level (as in the Ethers `FilterWatcher` stream). This will give us much better insight into future problems with the L1. --- sequencer/src/lib.rs | 6 ++- types/src/v0/impls/l1.rs | 87 +++++++++++++++++++++++++++++++++------- types/src/v0/mod.rs | 4 +- types/src/v0/v0_1/l1.rs | 18 ++++++++- types/src/v0/v0_3/mod.rs | 4 +- 5 files changed, 101 insertions(+), 18 deletions(-) diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 143e229b0..3bec4535c 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -495,7 +495,11 @@ pub async fn init_node( genesis_state.prefund_account(address, amount); } - let l1_client = l1_params.options.connect(l1_params.url).await?; + let l1_client = l1_params + .options + .with_metrics(metrics) + .connect(l1_params.url) + .await?; l1_client.spawn_tasks().await; let l1_genesis = match genesis.l1_finalized { L1Finalized::Block(b) => b, diff --git a/types/src/v0/impls/l1.rs b/types/src/v0/impls/l1.rs index 958d4b3ad..aac8404d6 100644 --- a/types/src/v0/impls/l1.rs +++ b/types/src/v0/impls/l1.rs @@ -19,6 +19,7 @@ use futures::{ future::Future, stream::{self, StreamExt}, }; +use hotshot_types::traits::metrics::Metrics; use lru::LruCache; use serde::{de::DeserializeOwned, Serialize}; use tokio::{ @@ -29,7 +30,7 @@ use tokio::{ use tracing::Instrument; use url::Url; -use super::{L1BlockInfo, L1State, L1UpdateTask, RpcClient}; +use super::{L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask, RpcClient}; use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1ReconnectTask, L1Snapshot}; impl PartialOrd for L1BlockInfo { @@ -79,16 +80,24 @@ impl L1BlockInfo { } impl RpcClient { - fn http(url: Url) -> Self { - Self::Http(Http::new(url)) + fn http(url: Url, metrics: Arc) -> Self { + Self::Http { + conn: Http::new(url), + metrics, + } } - async fn ws(url: Url, retry_delay: Duration) -> anyhow::Result { + async fn ws( + url: Url, + metrics: Arc, + retry_delay: Duration, + ) -> anyhow::Result { Ok(Self::Ws { conn: Arc::new(RwLock::new(Ws::connect(url.clone()).await?)), reconnect: Default::default(), retry_delay, url, + metrics, }) } @@ -97,6 +106,13 @@ impl RpcClient { *reconnect.lock().await = L1ReconnectTask::Cancelled; } } + + fn metrics(&self) -> &Arc { + match self { + Self::Http { metrics, .. } => metrics, + Self::Ws { metrics, .. } => metrics, + } + } } #[async_trait] @@ -109,12 +125,16 @@ impl JsonRpcClient for RpcClient { R: DeserializeOwned + Send, { let res = match self { - Self::Http(client) => client.request(method, params).await?, + Self::Http { conn, .. } => conn + .request(method, params) + .await + .inspect_err(|err| tracing::warn!(method, "L1 RPC error: {err:#}"))?, Self::Ws { conn, reconnect, url, retry_delay, + metrics, } => { let conn_guard = conn .try_read() @@ -131,6 +151,7 @@ impl JsonRpcClient for RpcClient { if let Ok(mut reconnect_guard) = reconnect.try_lock() { if matches!(*reconnect_guard, L1ReconnectTask::Idle) { // No one is currently resetting this connection, so it's up to us. + metrics.ws_reconnects.add(1); let conn = conn.clone(); let reconnect = reconnect.clone(); let url = url.clone(); @@ -174,7 +195,10 @@ impl JsonRpcClient for RpcClient { } Err(err)? } - Err(err) => Err(err)?, + Err(err) => { + tracing::warn!(method, "L1 RPC error: {err:#}"); + Err(err)? + } } } }; @@ -190,7 +214,7 @@ impl PubsubClient for RpcClient { T: Into, { match self { - Self::Http(_) => Err(ProviderError::CustomError( + Self::Http { .. } => Err(ProviderError::CustomError( "subscriptions not supported with HTTP client".into(), )), Self::Ws { conn, .. } => Ok(conn @@ -210,7 +234,7 @@ impl PubsubClient for RpcClient { T: Into, { match self { - Self::Http(_) => Err(ProviderError::CustomError( + Self::Http { .. } => Err(ProviderError::CustomError( "subscriptions not supported with HTTP client".into(), )), Self::Ws { conn, .. } => Ok(conn @@ -250,6 +274,12 @@ impl Default for L1ClientOptions { } impl L1ClientOptions { + /// Use the given metrics collector to publish metrics related to the L1 client. + pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self { + self.metrics = Arc::new(metrics.subgroup("l1".into())); + self + } + /// Instantiate an `L1Client` for a given `Url`. /// /// The type of the JSON-RPC client is inferred from the scheme of the URL. Supported schemes @@ -266,19 +296,36 @@ impl L1ClientOptions { /// /// `url` must have a scheme `http` or `https`. pub fn http(self, url: Url) -> L1Client { - L1Client::with_provider(self, Provider::new(RpcClient::http(url))) + let metrics = self.create_metrics(); + L1Client::with_provider(self, Provider::new(RpcClient::http(url, metrics))) } /// Construct a new WebSockets client. /// /// `url` must have a scheme `ws` or `wss`. pub async fn ws(self, url: Url) -> anyhow::Result { + let metrics = self.create_metrics(); let retry_delay = self.l1_retry_delay; Ok(L1Client::with_provider( self, - Provider::new(RpcClient::ws(url, retry_delay).await?), + Provider::new(RpcClient::ws(url, metrics, retry_delay).await?), )) } + + fn create_metrics(&self) -> Arc { + Arc::new(L1ClientMetrics::new(&**self.metrics)) + } +} + +impl L1ClientMetrics { + fn new(metrics: &(impl Metrics + ?Sized)) -> Self { + Self { + head: metrics.create_gauge("head".into(), None), + finalized: metrics.create_gauge("finalized".into(), None), + ws_reconnects: metrics.create_counter("ws_reconnects".into(), None), + stream_reconnects: metrics.create_counter("stream_reconnects".into(), None), + } + } } impl L1Client { @@ -346,6 +393,7 @@ impl L1Client { let retry_delay = self.retry_delay; let state = self.state.clone(); let sender = self.sender.clone(); + let metrics = (*rpc).as_ref().metrics().clone(); let span = tracing::warn_span!("L1 client update"); async move { @@ -354,7 +402,7 @@ impl L1Client { let mut block_stream = loop { let res = match (*rpc).as_ref() { RpcClient::Ws { .. } => rpc.subscribe_blocks().await.map(StreamExt::boxed), - RpcClient::Http(_) => rpc + RpcClient::Http { .. } => rpc .watch_blocks() .await .map(|stream| { @@ -427,6 +475,7 @@ impl L1Client { let mut state = state.lock().await; if head > state.snapshot.head { tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); + metrics.head.set(head as usize); state.snapshot.head = head; // Emit an event about the new L1 head. Ignore send errors; it just means no // one is listening to events right now. @@ -441,6 +490,9 @@ impl L1Client { old_finalized = ?state.snapshot.finalized, "L1 finalized updated", ); + if let Some(finalized) = finalized { + metrics.finalized.set(finalized.number as usize); + } state.snapshot.finalized = finalized; if let Some(finalized) = finalized { sender @@ -463,6 +515,8 @@ impl L1Client { } } } + + metrics.stream_reconnects.add(1); } }.instrument(span) } @@ -787,6 +841,7 @@ mod test { prelude::{LocalWallet, Signer, SignerMiddleware, H160, U64}, utils::{hex, parse_ether, Anvil, AnvilInstance}, }; + use hotshot_types::traits::metrics::NoMetrics; use portpicker::pick_unused_port; use sequencer_utils::test_utils::setup_test; use std::time::Duration; @@ -1088,9 +1143,13 @@ mod test { let port = pick_unused_port().unwrap(); let mut anvil = Anvil::new().block_time(1u32).port(port).spawn(); let provider = Provider::new( - RpcClient::ws(anvil.ws_endpoint().parse().unwrap(), Duration::from_secs(1)) - .await - .unwrap(), + RpcClient::ws( + anvil.ws_endpoint().parse().unwrap(), + Arc::new(L1ClientMetrics::new(&NoMetrics)), + Duration::from_secs(1), + ) + .await + .unwrap(), ); // Check the provider is working. diff --git a/types/src/v0/mod.rs b/types/src/v0/mod.rs index 59a44cf65..c5d67031f 100644 --- a/types/src/v0/mod.rs +++ b/types/src/v0/mod.rs @@ -123,7 +123,9 @@ reexport_unchanged_types!( ViewBasedUpgrade, BlockSize, ); -pub(crate) use v0_3::{L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient}; +pub(crate) use v0_3::{ + L1ClientMetrics, L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient, +}; #[derive( Clone, Copy, Debug, Default, Hash, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, diff --git a/types/src/v0/v0_1/l1.rs b/types/src/v0/v0_1/l1.rs index 5259aaa9a..6aa3f3e9d 100644 --- a/types/src/v0/v0_1/l1.rs +++ b/types/src/v0/v0_1/l1.rs @@ -5,6 +5,7 @@ use ethers::{ prelude::{H256, U256}, providers::{Http, Provider, Ws}, }; +use hotshot_types::traits::metrics::{Counter, Gauge, Metrics, NoMetrics}; use lru::LruCache; use serde::{Deserialize, Serialize}; use std::{num::NonZeroUsize, sync::Arc, time::Duration}; @@ -86,6 +87,9 @@ pub struct L1ClientOptions { default_value = "10000" )] pub l1_events_max_block_range: u64, + + #[clap(skip = Arc::>::new(Box::new(NoMetrics)))] + pub metrics: Arc>, } #[derive(Clone, Debug)] @@ -115,12 +119,16 @@ pub struct L1Client { /// An Ethereum RPC client over HTTP or WebSockets. #[derive(Clone, Debug)] pub(crate) enum RpcClient { - Http(Http), + Http { + conn: Http, + metrics: Arc, + }, Ws { conn: Arc>, reconnect: Arc>, url: Url, retry_delay: Duration, + metrics: Arc, }, } @@ -147,3 +155,11 @@ pub(crate) enum L1ReconnectTask { Idle, Cancelled, } + +#[derive(Debug)] +pub(crate) struct L1ClientMetrics { + pub(crate) head: Box, + pub(crate) finalized: Box, + pub(crate) ws_reconnects: Box, + pub(crate) stream_reconnects: Box, +} diff --git a/types/src/v0/v0_3/mod.rs b/types/src/v0/v0_3/mod.rs index 86a0150a3..838696a6c 100644 --- a/types/src/v0/v0_3/mod.rs +++ b/types/src/v0/v0_3/mod.rs @@ -12,7 +12,9 @@ pub use super::v0_1::{ UpgradeType, ViewBasedUpgrade, BLOCK_MERKLE_TREE_HEIGHT, FEE_MERKLE_TREE_HEIGHT, NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN, }; -pub(crate) use super::v0_1::{L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient}; +pub(crate) use super::v0_1::{ + L1ClientMetrics, L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient, +}; pub const VERSION: Version = Version { major: 0, minor: 3 };