diff --git a/Cargo.lock b/Cargo.lock index 6bd36512..08e58737 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,6 +1354,7 @@ dependencies = [ "tokio", "tx-sender", "vlog", + "withdrawals-meterer", ] [[package]] @@ -4610,6 +4611,7 @@ dependencies = [ "thiserror", "tokio", "vlog", + "withdrawals-meterer", ] [[package]] @@ -4821,6 +4823,18 @@ dependencies = [ "watcher", ] +[[package]] +name = "withdrawals-meterer" +version = "0.1.11" +dependencies = [ + "client", + "ethers", + "metrics", + "sqlx", + "storage", + "vlog", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index 221eef27..4de13e85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ members = [ "tx-sender", "vlog", "watcher", + "withdrawals-meterer" ] diff --git a/finalizer/Cargo.toml b/finalizer/Cargo.toml index b6fcd56c..5dadf151 100644 --- a/finalizer/Cargo.toml +++ b/finalizer/Cargo.toml @@ -18,3 +18,4 @@ client = { path = "../client" } storage = { path = "../storage" } vlog = { path = "../vlog" } tx-sender = { path = "../tx-sender" } +withdrawals-meterer = { path = "../withdrawals-meterer" } diff --git a/finalizer/src/lib.rs b/finalizer/src/lib.rs index 3931abae..f00ce708 100644 --- a/finalizer/src/lib.rs +++ b/finalizer/src/lib.rs @@ -24,6 +24,7 @@ use client::{ l1bridge::codegen::IL1Bridge, withdrawal_finalizer::codegen::WithdrawalFinalizer, zksync_contract::codegen::IZkSync, WithdrawalParams, ZksyncMiddleware, }; +use withdrawals_meterer::WithdrawalsMeter; use crate::error::{Error, Result}; @@ -54,6 +55,7 @@ pub struct Finalizer { tx_fee_limit: U256, tx_retry_timeout: Duration, account_address: Address, + withdrawals_meterer: WithdrawalsMeter, } const NO_NEW_WITHDRAWALS_BACKOFF: Duration = Duration::from_secs(5); @@ -82,6 +84,10 @@ where tx_retry_timeout: usize, account_address: Address, ) -> Self { + let withdrawals_meterer = withdrawals_meterer::WithdrawalsMeter::new( + pgpool.clone(), + "era_withdrawal_finalizer_meter", + ); let tx_fee_limit = ethers::utils::parse_ether(TX_FEE_LIMIT) .expect("{TX_FEE_LIMIT} ether is a parsable amount; qed"); @@ -98,6 +104,7 @@ where tx_fee_limit, tx_retry_timeout: Duration::from_secs(tx_retry_timeout as u64), account_address, + withdrawals_meterer, } } @@ -183,6 +190,8 @@ where ) .await; + let ids: Vec<_> = withdrawals.iter().map(|w| w.id as i64).collect(); + // Turn actual withdrawals into info to update db with. let withdrawals = withdrawals.into_iter().map(|w| w.key()).collect::>(); @@ -204,6 +213,14 @@ where "finalizer.highest_finalized_batch_number", highest_batch_number.as_u64() as f64, ); + + if let Err(e) = self + .withdrawals_meterer + .meter_withdrawals_storage(&ids) + .await + { + vlog::error!("Failed to meter the withdrawals: {e}"); + } } // TODO: why would a pending tx resolve to `None`? Ok(None) => { diff --git a/storage/.sqlx/query-0fa6ebcd275b249c6bbc46a40824b366527f5c69f958ae499ce198838d7e899a.json b/storage/.sqlx/query-05c4906bd809f5d6f90e41b9130004c9aa432464493a331f37fcb171bdedb271.json similarity index 79% rename from storage/.sqlx/query-0fa6ebcd275b249c6bbc46a40824b366527f5c69f958ae499ce198838d7e899a.json rename to storage/.sqlx/query-05c4906bd809f5d6f90e41b9130004c9aa432464493a331f37fcb171bdedb271.json index 4d6c97ad..0560be4c 100644 --- a/storage/.sqlx/query-0fa6ebcd275b249c6bbc46a40824b366527f5c69f958ae499ce198838d7e899a.json +++ b/storage/.sqlx/query-05c4906bd809f5d6f90e41b9130004c9aa432464493a331f37fcb171bdedb271.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n DELETE FROM\n withdrawals\n WHERE\n id in (\n SELECT\n id\n from\n withdrawals\n LIMIT\n $1\n )\n RETURNING id\n ", + "query": "\n DELETE FROM\n withdrawals\n WHERE\n id in (\n SELECT\n id\n from\n withdrawals\n LIMIT\n $1\n ) RETURNING id\n ", "describe": { "columns": [ { @@ -18,5 +18,5 @@ false ] }, - "hash": "0fa6ebcd275b249c6bbc46a40824b366527f5c69f958ae499ce198838d7e899a" + "hash": "05c4906bd809f5d6f90e41b9130004c9aa432464493a331f37fcb171bdedb271" } diff --git a/storage/.sqlx/query-51fd4f0c9564d4e133caaba641fbd7d1b36cb68cf93a385ea1f0934c92f034ed.json b/storage/.sqlx/query-51fd4f0c9564d4e133caaba641fbd7d1b36cb68cf93a385ea1f0934c92f034ed.json new file mode 100644 index 00000000..d4318603 --- /dev/null +++ b/storage/.sqlx/query-51fd4f0c9564d4e133caaba641fbd7d1b36cb68cf93a385ea1f0934c92f034ed.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT decimals FROM tokens WHERE l2_token_address = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "decimals", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false + ] + }, + "hash": "51fd4f0c9564d4e133caaba641fbd7d1b36cb68cf93a385ea1f0934c92f034ed" +} diff --git a/storage/.sqlx/query-f4ae16616cf636fc5eac046fc68e9d6ead70c3f048cb6103e65c5cedef95a202.json b/storage/.sqlx/query-f4ae16616cf636fc5eac046fc68e9d6ead70c3f048cb6103e65c5cedef95a202.json new file mode 100644 index 00000000..c7afe10e --- /dev/null +++ b/storage/.sqlx/query-f4ae16616cf636fc5eac046fc68e9d6ead70c3f048cb6103e65c5cedef95a202.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT * FROM\n withdrawals\n WHERE id in (SELECT id FROM unnest( $1 :: bigint[] ))\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "l2_block_number", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "token", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "amount", + "type_info": "Numeric" + }, + { + "ordinal": 4, + "name": "event_index_in_tx", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "f4ae16616cf636fc5eac046fc68e9d6ead70c3f048cb6103e65c5cedef95a202" +} diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 3142afc5..1c8271e8 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -244,6 +244,38 @@ pub async fn executed_new_batch( Ok(()) } +/// Gets withdrawal events from the db by a set of IDs. +/// +/// # Arguments +/// +/// * `conn`: Connection to the Postgres DB +/// * `ids`: ID fields of the withdrawals to be returned. +pub async fn get_withdrawals(pool: &PgPool, ids: &[i64]) -> Result> { + let events = sqlx::query!( + " + SELECT * FROM + withdrawals + WHERE id in (SELECT id FROM unnest( $1 :: bigint[] )) + ", + ids + ) + .fetch_all(pool) + .await? + .into_iter() + .map(|r| StoredWithdrawal { + event: WithdrawalEvent { + tx_hash: H256::from_slice(&r.tx_hash), + block_number: r.l2_block_number as u64, + token: Address::from_slice(&r.token), + amount: utils::bigdecimal_to_u256(r.amount), + }, + index_in_tx: r.event_index_in_tx as usize, + }) + .collect(); + + Ok(events) +} + /// Adds a withdrawal event to the DB. /// /// # Arguments @@ -838,6 +870,26 @@ pub async fn inc_unsuccessful_finalization_attempts( Ok(()) } +/// Fetch decimals for a token. +/// +/// # Arguments +/// +/// * `pool` - `PgPool` +/// * `token` - L2 token address. +pub async fn token_decimals(pool: &PgPool, token: Address) -> Result> { + let result = sqlx::query!( + " + SELECT decimals FROM tokens WHERE l2_token_address = $1 + ", + token.as_bytes(), + ) + .fetch_optional(pool) + .await? + .map(|r| r.decimals as u32); + + Ok(result) +} + async fn wipe_finalization_data(pool: &PgPool, delete_batch_size: usize) -> Result<()> { loop { let deleted_ids = sqlx::query!( @@ -942,8 +994,7 @@ async fn wipe_withdrawals(pool: &PgPool, delete_batch_size: usize) -> Result<()> withdrawals LIMIT $1 - ) - RETURNING id + ) RETURNING id ", delete_batch_size as i64, ) diff --git a/storage/src/utils.rs b/storage/src/utils.rs index 9653441c..569ad60e 100644 --- a/storage/src/utils.rs +++ b/storage/src/utils.rs @@ -14,6 +14,18 @@ pub(crate) fn u256_to_big_decimal(value: U256) -> BigDecimal { ratio_to_big_decimal(&ratio, 80) } +/// Converts `BigUint` value into the corresponding `U256` value. +fn biguint_to_u256(value: BigUint) -> U256 { + let bytes = value.to_bytes_le(); + U256::from_little_endian(&bytes) +} + +/// Converts `BigDecimal` value into the corresponding `U256` value. +pub(crate) fn bigdecimal_to_u256(value: BigDecimal) -> U256 { + let bigint = value.with_scale(0).into_bigint_and_exponent().0; + biguint_to_u256(bigint.to_biguint().unwrap()) +} + fn ratio_to_big_decimal(num: &Ratio, precision: usize) -> BigDecimal { let bigint = round_precision_raw_no_div(num, precision) .to_bigint() diff --git a/watcher/Cargo.toml b/watcher/Cargo.toml index 0db3a3ca..3a66a386 100644 --- a/watcher/Cargo.toml +++ b/watcher/Cargo.toml @@ -19,3 +19,4 @@ client = { path = "../client" } chain-events = { path = "../chain-events" } storage = { path = "../storage" } vlog = { path = "../vlog" } +withdrawals-meterer = { path = "../withdrawals-meterer" } diff --git a/watcher/src/lib.rs b/watcher/src/lib.rs index cb98f6ae..93044c24 100644 --- a/watcher/src/lib.rs +++ b/watcher/src/lib.rs @@ -11,6 +11,7 @@ use storage::StoredWithdrawal; use tokio::pin; use client::{zksync_contract::L2ToL1Event, BlockEvent, WithdrawalEvent, ZksyncMiddleware}; +use withdrawals_meterer::WithdrawalsMeter; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -29,6 +30,7 @@ pub type Result = std::result::Result; pub struct Watcher { l2_provider: Arc, pgpool: PgPool, + withdrawals_meterer: WithdrawalsMeter, } impl Watcher @@ -38,9 +40,15 @@ where { #[allow(clippy::too_many_arguments)] pub fn new(l2_provider: Arc, pgpool: PgPool) -> Self { + let withdrawals_meterer = withdrawals_meterer::WithdrawalsMeter::new( + pgpool.clone(), + "era_withdrawal_finalizer_watcher_meter", + ); + Self { l2_provider, pgpool, + withdrawals_meterer, } } @@ -57,6 +65,7 @@ where let Watcher { l2_provider, pgpool, + withdrawals_meterer, } = self; // While reading the stream of withdrawal events asyncronously @@ -79,8 +88,15 @@ where block_events, l2_provider, )); - let l2_loop_handler = - tokio::spawn(run_l2_events_loop(pgpool, withdrawal_events, from_l2_block)); + let l2_loop_handler = tokio::spawn(async move { + run_l2_events_loop( + pgpool, + withdrawal_events, + from_l2_block, + withdrawals_meterer, + ) + .await + }); pin!(l1_loop_handler); pin!(l2_loop_handler); @@ -272,7 +288,11 @@ where Ok(()) } -async fn process_withdrawals_in_block(pool: &PgPool, events: Vec) -> Result<()> { +async fn process_withdrawals_in_block( + pool: &PgPool, + events: Vec, + withdrawals_meterer: &mut WithdrawalsMeter, +) -> Result<()> { use itertools::Itertools; let group_by = events.into_iter().group_by(|event| event.tx_hash); let mut withdrawals_vec = vec![]; @@ -295,6 +315,13 @@ async fn process_withdrawals_in_block(pool: &PgPool, events: Vec(pool: PgPool, we: WE, from_l2_block: u64) -> Result<()> +async fn run_l2_events_loop( + pool: PgPool, + we: WE, + from_l2_block: u64, + mut withdrawals_meterer: WithdrawalsMeter, +) -> Result<()> where WE: Stream, { @@ -345,8 +377,12 @@ where L2Event::Withdrawal(event) => { vlog::info!("received withdrawal event {event:?}"); if event.block_number > curr_l2_block_number { - process_withdrawals_in_block(&pool, std::mem::take(&mut in_block_events)) - .await?; + process_withdrawals_in_block( + &pool, + std::mem::take(&mut in_block_events), + &mut withdrawals_meterer, + ) + .await?; curr_l2_block_number = event.block_number; } in_block_events.push(event); diff --git a/withdrawals-meterer/Cargo.toml b/withdrawals-meterer/Cargo.toml new file mode 100644 index 00000000..19e27821 --- /dev/null +++ b/withdrawals-meterer/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "withdrawals-meterer" +version = "0.1.11" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +license = "MIT OR Apache-2.0" +edition = "2021" + +[dependencies] +ethers = "2.0.10" +metrics = "0.21.1" + +client = { path = "../client" } +sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-rustls"] } +storage = { path = "../storage" } +vlog = { path = "../vlog" } diff --git a/withdrawals-meterer/src/lib.rs b/withdrawals-meterer/src/lib.rs new file mode 100644 index 00000000..380b2312 --- /dev/null +++ b/withdrawals-meterer/src/lib.rs @@ -0,0 +1,100 @@ +#![deny(unused_crate_dependencies)] +#![warn(missing_docs)] +#![warn(unused_extern_crates)] +#![warn(unused_imports)] + +//! A utility crate that meters withdrawals amounts. + +use std::{collections::HashMap, str::FromStr}; + +use client::ETH_TOKEN_ADDRESS; +use ethers::types::Address; +use sqlx::PgPool; +use storage::StoredWithdrawal; + +/// State of withdrawals volumes metering. +pub struct WithdrawalsMeter { + pool: PgPool, + token_decimals: HashMap, + component_name: &'static str, +} + +impl WithdrawalsMeter { + /// Create a new [`WithdrawalsMeter`] + /// + /// # Arguments + /// + /// * `pool`: DB connection pool + /// * `component_name`: Name of the component that does metering, metric names will be + /// derived from it + pub fn new(pool: PgPool, component_name: &'static str) -> Self { + let mut token_decimals = HashMap::new(); + token_decimals.insert(ETH_TOKEN_ADDRESS, 18_u32); + + Self { + pool, + token_decimals, + component_name, + } + } + + /// Given a set of withdrawal ids meter all of them to a metric + /// with a given name. + pub async fn meter_withdrawals_storage(&mut self, ids: &[i64]) -> Result<(), storage::Error> { + let withdrawals = storage::get_withdrawals(&self.pool, ids).await?; + + self.meter_withdrawals(&withdrawals).await?; + + Ok(()) + } + + /// Given a set of [`StoredWithdrawal`], meter all of them to a + /// metric with a given name. + /// + /// This function returns only storage error, all formatting, etc + /// errors will be just logged. + pub async fn meter_withdrawals( + &mut self, + withdrawals: &[StoredWithdrawal], + ) -> Result<(), storage::Error> { + for w in withdrawals { + let decimals = match self.token_decimals.get(&w.event.token) { + None => { + let Some(decimals) = storage::token_decimals(&self.pool, w.event.token).await? + else { + vlog::error!("Received withdrawal from unknown token {:?}", w.event.token); + continue; + }; + + self.token_decimals.insert(w.event.token, decimals); + decimals + } + Some(decimals) => *decimals, + }; + + let formatted = match ethers::utils::format_units(w.event.amount, decimals) { + Ok(f) => f, + Err(e) => { + vlog::error!("failed to format units: {e}"); + continue; + } + }; + + let formatted_f64 = match f64::from_str(&formatted) { + Ok(f) => f, + Err(e) => { + vlog::error!("failed to format units: {e}"); + continue; + } + }; + + metrics::increment_gauge!( + format!("{}_withdrawals", self.component_name), + formatted_f64, + "token" => format!("{:?}", w.event.token) + ) + } + + Ok(()) + } +}