Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
montekki committed Sep 28, 2023
1 parent 4e9a2ee commit 78657fd
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 92 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 11 additions & 6 deletions finalizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use client::{
l1bridge::codegen::IL1Bridge, withdrawal_finalizer::codegen::WithdrawalFinalizer,
zksync_contract::codegen::IZkSync, WithdrawalParams, ZksyncMiddleware,
};
use withdrawals_meterer::WithdrawalsMeter;

use crate::error::{Error, Result};

Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct Finalizer<M1, M2> {
tx_fee_limit: U256,
tx_retry_timeout: Duration,
account_address: Address,
withdrawals_meterer: WithdrawalsMeter,
}

const NO_NEW_WITHDRAWALS_BACKOFF: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -82,6 +84,10 @@ where
tx_retry_timeout: usize,
account_address: Address,
) -> Self {
let withdrawals_meterer = withdrawals_meterer::WithdrawalsMeter::new(
pgpool.clone(),
"era_withdrawal_finalizer_withdrawn_tokens",
);
let tx_fee_limit = ethers::utils::parse_ether(TX_FEE_LIMIT)
.expect("{TX_FEE_LIMIT} ether is a parsable amount; qed");

Expand All @@ -98,6 +104,7 @@ where
tx_fee_limit,
tx_retry_timeout: Duration::from_secs(tx_retry_timeout as u64),
account_address,
withdrawals_meterer,
}
}

Expand Down Expand Up @@ -207,12 +214,10 @@ where
highest_batch_number.as_u64() as f64,
);

if let Err(e) = withdrawals_meterer::meter_finalized_withdrawals_storage(
&self.pgpool,
ids,
"era_withdrawal_finalizer_withdrawn_tokens",
)
.await
if let Err(e) = self
.withdrawals_meterer
.meter_finalized_withdrawals_storage(&ids)
.await
{
vlog::error!("Failed to meter the withdrawals: {e}");
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ pub async fn get_withdrawals(pool: &PgPool, ids: &[i64]) -> Result<Vec<StoredWit
"
SELECT * FROM
withdrawals
WHERE id in (SELECT * FROM unnest( $1 :: bigint[] ))
WHERE id in (SELECT id FROM unnest( $1 :: bigint[] ))
",
ids
)
Expand Down
50 changes: 38 additions & 12 deletions watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use storage::StoredWithdrawal;
use tokio::pin;

use client::{zksync_contract::L2ToL1Event, BlockEvent, WithdrawalEvent, ZksyncMiddleware};
use withdrawals_meterer::WithdrawalsMeter;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -29,6 +30,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub struct Watcher<M2> {
l2_provider: Arc<M2>,
pgpool: PgPool,
withdrawals_meterer: WithdrawalsMeter,
}

impl<M2> Watcher<M2>
Expand All @@ -38,9 +40,15 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(l2_provider: Arc<M2>, pgpool: PgPool) -> Self {
let withdrawals_meterer = withdrawals_meterer::WithdrawalsMeter::new(
pgpool.clone(),
"era_withdrawn_tokens_amounts_tracker",
);

Self {
l2_provider,
pgpool,
withdrawals_meterer,
}
}

Expand All @@ -57,6 +65,7 @@ where
let Watcher {
l2_provider,
pgpool,
withdrawals_meterer,
} = self;

// While reading the stream of withdrawal events asyncronously
Expand All @@ -79,8 +88,15 @@ where
block_events,
l2_provider,
));
let l2_loop_handler =
tokio::spawn(run_l2_events_loop(pgpool, withdrawal_events, from_l2_block));
let l2_loop_handler = tokio::spawn(async move {
run_l2_events_loop(
pgpool,
withdrawal_events,
from_l2_block,
withdrawals_meterer,
)
.await
});

pin!(l1_loop_handler);
pin!(l2_loop_handler);
Expand Down Expand Up @@ -272,7 +288,11 @@ where
Ok(())
}

async fn process_withdrawals_in_block(pool: &PgPool, events: Vec<WithdrawalEvent>) -> Result<()> {
async fn process_withdrawals_in_block(
pool: &PgPool,
events: Vec<WithdrawalEvent>,
withdrawals_meterer: &mut WithdrawalsMeter,
) -> Result<()> {
use itertools::Itertools;
let group_by = events.into_iter().group_by(|event| event.tx_hash);
let mut withdrawals_vec = vec![];
Expand All @@ -295,12 +315,9 @@ async fn process_withdrawals_in_block(pool: &PgPool, events: Vec<WithdrawalEvent
});
}

if let Err(e) = withdrawals_meterer::meter_finalized_withdrawals(
pool,
&stored_withdrawals,
"era_withdrawn_tokens_amounts_tracker",
)
.await
if let Err(e) = withdrawals_meterer
.meter_finalized_withdrawals(&stored_withdrawals)
.await
{
vlog::error!("Failed to meter requested withdrawals: {e}");
}
Expand Down Expand Up @@ -342,7 +359,12 @@ where
Ok(())
}

async fn run_l2_events_loop<WE>(pool: PgPool, we: WE, from_l2_block: u64) -> Result<()>
async fn run_l2_events_loop<WE>(
pool: PgPool,
we: WE,
from_l2_block: u64,
mut withdrawals_meterer: WithdrawalsMeter,
) -> Result<()>
where
WE: Stream<Item = L2Event>,
{
Expand All @@ -355,8 +377,12 @@ where
L2Event::Withdrawal(event) => {
vlog::info!("received withdrawal event {event:?}");
if event.block_number > curr_l2_block_number {
process_withdrawals_in_block(&pool, std::mem::take(&mut in_block_events))
.await?;
process_withdrawals_in_block(
&pool,
std::mem::take(&mut in_block_events),
&mut withdrawals_meterer,
)
.await?;
curr_l2_block_number = event.block_number;
}
in_block_events.push(event);
Expand Down
2 changes: 0 additions & 2 deletions withdrawals-meterer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ edition = "2021"

[dependencies]
ethers = "2.0.10"
lazy_static = "1.4.0"
metrics = "0.21.1"
tokio = "1.32.0"

client = { path = "../client" }
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-rustls"] }
Expand Down
143 changes: 76 additions & 67 deletions withdrawals-meterer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,88 +5,97 @@

//! A utility crate that meters withdrawals amounts.
use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{collections::HashMap, str::FromStr};

use client::ETH_TOKEN_ADDRESS;
use ethers::types::Address;
use lazy_static::lazy_static;
use sqlx::PgPool;
use storage::StoredWithdrawal;
use tokio::sync::RwLock;

lazy_static! {
static ref TOKEN_DECIMALS: Arc<RwLock<HashMap<Address, u32>>> = {
let mut map = HashMap::new();
map.insert(ETH_TOKEN_ADDRESS, 18_u32);

Arc::new(RwLock::new(map))
};
/// State of withdrawls volumes metering.
pub struct WithdrawalsMeter {
pool: PgPool,
token_decimals: HashMap<Address, u32>,
metric_name: &'static str,
}

/// Given a set of withdrawal ids meter all of them to a metric
/// with a given name.
pub async fn meter_finalized_withdrawals_storage(
pool: &PgPool,
ids: Vec<i64>,
metric_name: &'static str,
) -> Result<(), storage::Error> {
let withdrawals = storage::get_withdrawals(pool, &ids).await?;
impl WithdrawalsMeter {
/// Create a new [`WithdrawalsMeter`]
///
/// # Arguments
///
/// * `pool`: DB connection pool
/// * `metric_name: Name of the metric to meter to
pub fn new(pool: PgPool, metric_name: &'static str) -> Self {
let mut token_decimals = HashMap::new();
token_decimals.insert(ETH_TOKEN_ADDRESS, 18_u32);

meter_finalized_withdrawals(pool, &withdrawals, metric_name).await?;
Self {
pool,
token_decimals,
metric_name,
}
}
/// Given a set of withdrawal ids meter all of them to a metric
/// with a given name.
pub async fn meter_finalized_withdrawals_storage(
&mut self,
ids: &[i64],
) -> Result<(), storage::Error> {
let withdrawals = storage::get_withdrawals(&self.pool, ids).await?;

Ok(())
}
self.meter_finalized_withdrawals(&withdrawals).await?;

/// Given a set of [`StoredWithdrawal`], meter all of them to a
/// metric with a given name.
///
/// This function returns only storage error, all formatting, etc
/// errors will be just logged.
pub async fn meter_finalized_withdrawals(
pool: &PgPool,
withdrawals: &[StoredWithdrawal],
metric_name: &'static str,
) -> Result<(), storage::Error> {
for w in withdrawals {
let guard = TOKEN_DECIMALS.read().await;
let decimals = guard.get(&w.event.token).copied();
drop(guard);
Ok(())
}

let decimals = match decimals {
None => {
let Some(decimals) = storage::token_decimals(pool, w.event.token).await? else {
vlog::error!("Received withdrawal from unknown token {:?}", w.event.token);
continue;
};
/// Given a set of [`StoredWithdrawal`], meter all of them to a
/// metric with a given name.
///
/// This function returns only storage error, all formatting, etc
/// errors will be just logged.
pub async fn meter_finalized_withdrawals(
&mut self,
withdrawals: &[StoredWithdrawal],
) -> Result<(), storage::Error> {
for w in withdrawals {
let decimals = match self.token_decimals.get(&w.event.token) {
None => {
let Some(decimals) = storage::token_decimals(&self.pool, w.event.token).await?
else {
vlog::error!("Received withdrawal from unknown token {:?}", w.event.token);
continue;
};

TOKEN_DECIMALS.write().await.insert(w.event.token, decimals);
decimals
}
Some(decimals) => decimals,
};
self.token_decimals.insert(w.event.token, decimals);
decimals
}
Some(decimals) => *decimals,
};

let formatted = match ethers::utils::format_units(w.event.amount, decimals) {
Ok(f) => f,
Err(e) => {
vlog::error!("failed to format units: {e}");
continue;
}
};
let formatted = match ethers::utils::format_units(w.event.amount, decimals) {
Ok(f) => f,
Err(e) => {
vlog::error!("failed to format units: {e}");
continue;
}
};

let formatted_f64 = match f64::from_str(&formatted) {
Ok(f) => f,
Err(e) => {
vlog::error!("failed to format units: {e}");
continue;
}
};
let formatted_f64 = match f64::from_str(&formatted) {
Ok(f) => f,
Err(e) => {
vlog::error!("failed to format units: {e}");
continue;
}
};

metrics::increment_gauge!(
metric_name,
formatted_f64,
"token" => format!("{:?}", w.event.token)
)
}
metrics::increment_gauge!(
self.metric_name,
formatted_f64,
"token" => format!("{:?}", w.event.token)
)
}

Ok(())
Ok(())
}
}

0 comments on commit 78657fd

Please sign in to comment.