Skip to content

Commit

Permalink
fix: request finalization status of withdrawal on the fetch params ph…
Browse files Browse the repository at this point in the history
…ase (#193)

* fix: request finalization status on params fetch step

* fix: clear finalization data custom migration
  • Loading branch information
montekki authored Sep 20, 2023
1 parent c1940f9 commit 426bcd5
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 42 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ resolver = "1"
members = [
"bin/withdrawal-finalizer",
"bin/delete-db-content-migration",
"bin/delete-finalization-data-migration",
"ethers-log-decode",
"finalizer",
"client",
Expand Down
16 changes: 16 additions & 0 deletions bin/delete-finalization-data-migration/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "delete-finalization-data-migration"
version = "0.1.11"
authors = ["The Matter Labs Team <[email protected]>"]
homepage = "https://zksync.io/"
license = "MIT OR Apache-2.0"
edition = "2021"

[dependencies]
color-eyre = "0.6.2"
eyre = "0.6.8"
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-rustls"] }
tokio = { version = "1.32.0", features = ["full"] }

storage = { path = "../../storage" }
dotenvy = "0.15.7"
11 changes: 11 additions & 0 deletions bin/delete-finalization-data-migration/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use eyre::Result;
use sqlx::PgPool;

#[tokio::main]
async fn main() -> Result<()> {
let pgpool = PgPool::connect(&dotenvy::var("DATABASE_URL")?).await?;

storage::delete_finalization_data_content(&pgpool, 1000).await?;

Ok(())
}
112 changes: 70 additions & 42 deletions finalizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ where
where
M2: ZksyncMiddleware + 'static,
{
let params_fetcher_handle =
tokio::spawn(params_fetcher_loop(self.pgpool.clone(), middleware));
let params_fetcher_handle = tokio::spawn(params_fetcher_loop(
self.pgpool.clone(),
middleware,
self.zksync_contract.clone(),
self.l1_bridge.clone(),
));

let finalizer_handle = tokio::spawn(self.finalizer_loop());

Expand Down Expand Up @@ -324,7 +328,8 @@ where

let predicted = std::mem::take(&mut self.unsuccessful);
vlog::debug!("requesting finalization status of withdrawals");
let are_finalized = self.get_finalized_withdrawals(&predicted).await?;
let are_finalized =
get_finalized_withdrawals(&predicted, &self.zksync_contract, &self.l1_bridge).await?;

let mut already_finalized = vec![];
let mut unsuccessful = vec![];
Expand Down Expand Up @@ -373,45 +378,49 @@ where

Ok(())
}
}

async fn get_finalized_withdrawals(
&self,
withdrawals: &[WithdrawalParams],
) -> Result<HashSet<WithdrawalKey>> {
let results: Result<Vec<_>> =
futures::future::join_all(withdrawals.iter().map(|wd| async move {
let l1_batch_number = U256::from(wd.l1_batch_number.as_u64());
let l2_message_index = U256::from(wd.l2_message_index);

if is_eth(wd.sender) {
self.zksync_contract
.is_eth_withdrawal_finalized(l1_batch_number, l2_message_index)
.call()
.await
.map_err(|e| e.into())
} else {
self.l1_bridge
.is_withdrawal_finalized(l1_batch_number, l2_message_index)
.call()
.await
.map_err(|e| e.into())
}
}))
.await
.into_iter()
.collect();
async fn get_finalized_withdrawals<M>(
withdrawals: &[WithdrawalParams],
zksync_contract: &IZkSync<M>,
l1_bridge: &IL1Bridge<M>,
) -> Result<HashSet<WithdrawalKey>>
where
M: Middleware,
{
let results: Result<Vec<_>> =
futures::future::join_all(withdrawals.iter().map(|wd| async move {
let l1_batch_number = U256::from(wd.l1_batch_number.as_u64());
let l2_message_index = U256::from(wd.l2_message_index);

if is_eth(wd.sender) {
zksync_contract
.is_eth_withdrawal_finalized(l1_batch_number, l2_message_index)
.call()
.await
.map_err(|e| e.into())
} else {
l1_bridge
.is_withdrawal_finalized(l1_batch_number, l2_message_index)
.call()
.await
.map_err(|e| e.into())
}
}))
.await
.into_iter()
.collect();

let results = results?;
let results = results?;

let mut set = HashSet::new();
for i in 0..results.len() {
if results[i] {
set.insert(withdrawals[i].key());
}
let mut set = HashSet::new();
for i in 0..results.len() {
if results[i] {
set.insert(withdrawals[i].key());
}

Ok(set)
}

Ok(set)
}

fn is_gas_required_exceeds_allowance<M: Middleware>(e: &<M as Middleware>::Error) -> bool {
Expand Down Expand Up @@ -469,20 +478,33 @@ where
// Continiously query the new withdrawals that have been seen by watcher
// request finalizing params for them and store this information into
// finalizer db table.
async fn params_fetcher_loop<M2>(pool: PgPool, middleware: M2)
where
async fn params_fetcher_loop<M1, M2>(
pool: PgPool,
middleware: M2,
zksync_contract: IZkSync<M1>,
l1_bridge: IL1Bridge<M1>,
) where
M1: Middleware,
M2: ZksyncMiddleware,
{
loop {
if let Err(e) = params_fetcher_loop_iteration(&pool, &middleware).await {
if let Err(e) =
params_fetcher_loop_iteration(&pool, &middleware, &zksync_contract, &l1_bridge).await
{
vlog::error!("params fetcher iteration ended with {e}");
tokio::time::sleep(LOOP_ITERATION_ERROR_BACKOFF).await;
}
}
}

async fn params_fetcher_loop_iteration<M2>(pool: &PgPool, middleware: M2) -> Result<()>
async fn params_fetcher_loop_iteration<M1, M2>(
pool: &PgPool,
middleware: &M2,
zksync_contract: &IZkSync<M1>,
l1_bridge: &IL1Bridge<M1>,
) -> Result<()>
where
M1: Middleware,
M2: ZksyncMiddleware,
{
let newly_executed_withdrawals = storage::get_withdrawals_with_no_data(pool, 1000).await?;
Expand All @@ -492,7 +514,7 @@ where
return Ok(());
}

vlog::info!("newly executed withdrawals {newly_executed_withdrawals:?}");
vlog::debug!("newly committed withdrawals {newly_executed_withdrawals:?}");

let hash_and_index_and_id: Vec<_> = newly_executed_withdrawals
.iter()
Expand All @@ -501,7 +523,13 @@ where

let params = request_finalize_params(&middleware, &hash_and_index_and_id).await;

let already_finalized: Vec<_> = get_finalized_withdrawals(&params, zksync_contract, l1_bridge)
.await?
.into_iter()
.collect();

storage::add_withdrawals_data(pool, &params).await?;
storage::finalization_data_set_finalized_in_tx(pool, &already_finalized, H256::zero()).await?;

Ok(())
}
10 changes: 10 additions & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,3 +970,13 @@ pub async fn delete_db_content(pool: &PgPool, delete_batch_size: usize) -> Resul

Ok(())
}

/// Delete all content from `finalization_data` table.
pub async fn delete_finalization_data_content(
pool: &PgPool,
delete_batch_size: usize,
) -> Result<()> {
wipe_finalization_data(pool, delete_batch_size).await?;

Ok(())
}

0 comments on commit 426bcd5

Please sign in to comment.