diff --git a/Cargo.lock b/Cargo.lock index 8ef82880..bd9515f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -722,6 +722,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "delete-finalization-data-migration" +version = "0.1.11" +dependencies = [ + "color-eyre", + "dotenvy", + "eyre", + "sqlx", + "storage", + "tokio", +] + [[package]] name = "der" version = "0.7.8" diff --git a/Cargo.toml b/Cargo.toml index d4b7993a..221eef27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ resolver = "1" members = [ "bin/withdrawal-finalizer", "bin/delete-db-content-migration", + "bin/delete-finalization-data-migration", "ethers-log-decode", "finalizer", "client", diff --git a/bin/delete-finalization-data-migration/Cargo.toml b/bin/delete-finalization-data-migration/Cargo.toml new file mode 100644 index 00000000..4ad86412 --- /dev/null +++ b/bin/delete-finalization-data-migration/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "delete-finalization-data-migration" +version = "0.1.11" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +license = "MIT OR Apache-2.0" +edition = "2021" + +[dependencies] +color-eyre = "0.6.2" +eyre = "0.6.8" +sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-rustls"] } +tokio = { version = "1.32.0", features = ["full"] } + +storage = { path = "../../storage" } +dotenvy = "0.15.7" diff --git a/bin/delete-finalization-data-migration/src/main.rs b/bin/delete-finalization-data-migration/src/main.rs new file mode 100644 index 00000000..853a286e --- /dev/null +++ b/bin/delete-finalization-data-migration/src/main.rs @@ -0,0 +1,11 @@ +use eyre::Result; +use sqlx::PgPool; + +#[tokio::main] +async fn main() -> Result<()> { + let pgpool = PgPool::connect(&dotenvy::var("DATABASE_URL")?).await?; + + storage::delete_finalization_data_content(&pgpool, 1000).await?; + + Ok(()) +} diff --git a/finalizer/src/lib.rs b/finalizer/src/lib.rs index f81036dd..acea193e 100644 --- a/finalizer/src/lib.rs +++ b/finalizer/src/lib.rs @@ -108,8 +108,12 @@ where where M2: ZksyncMiddleware + 'static, { - let params_fetcher_handle = - tokio::spawn(params_fetcher_loop(self.pgpool.clone(), middleware)); + let params_fetcher_handle = tokio::spawn(params_fetcher_loop( + self.pgpool.clone(), + middleware, + self.zksync_contract.clone(), + self.l1_bridge.clone(), + )); let finalizer_handle = tokio::spawn(self.finalizer_loop()); @@ -324,7 +328,8 @@ where let predicted = std::mem::take(&mut self.unsuccessful); vlog::debug!("requesting finalization status of withdrawals"); - let are_finalized = self.get_finalized_withdrawals(&predicted).await?; + let are_finalized = + get_finalized_withdrawals(&predicted, &self.zksync_contract, &self.l1_bridge).await?; let mut already_finalized = vec![]; let mut unsuccessful = vec![]; @@ -373,45 +378,49 @@ where Ok(()) } +} - async fn get_finalized_withdrawals( - &self, - withdrawals: &[WithdrawalParams], - ) -> Result> { - let results: Result> = - futures::future::join_all(withdrawals.iter().map(|wd| async move { - let l1_batch_number = U256::from(wd.l1_batch_number.as_u64()); - let l2_message_index = U256::from(wd.l2_message_index); - - if is_eth(wd.sender) { - self.zksync_contract - .is_eth_withdrawal_finalized(l1_batch_number, l2_message_index) - .call() - .await - .map_err(|e| e.into()) - } else { - self.l1_bridge - .is_withdrawal_finalized(l1_batch_number, l2_message_index) - .call() - .await - .map_err(|e| e.into()) - } - })) - .await - .into_iter() - .collect(); +async fn get_finalized_withdrawals( + withdrawals: &[WithdrawalParams], + zksync_contract: &IZkSync, + l1_bridge: &IL1Bridge, +) -> Result> +where + M: Middleware, +{ + let results: Result> = + futures::future::join_all(withdrawals.iter().map(|wd| async move { + let l1_batch_number = U256::from(wd.l1_batch_number.as_u64()); + let l2_message_index = U256::from(wd.l2_message_index); + + if is_eth(wd.sender) { + zksync_contract + .is_eth_withdrawal_finalized(l1_batch_number, l2_message_index) + .call() + .await + .map_err(|e| e.into()) + } else { + l1_bridge + .is_withdrawal_finalized(l1_batch_number, l2_message_index) + .call() + .await + .map_err(|e| e.into()) + } + })) + .await + .into_iter() + .collect(); - let results = results?; + let results = results?; - let mut set = HashSet::new(); - for i in 0..results.len() { - if results[i] { - set.insert(withdrawals[i].key()); - } + let mut set = HashSet::new(); + for i in 0..results.len() { + if results[i] { + set.insert(withdrawals[i].key()); } - - Ok(set) } + + Ok(set) } fn is_gas_required_exceeds_allowance(e: &::Error) -> bool { @@ -469,20 +478,33 @@ where // Continiously query the new withdrawals that have been seen by watcher // request finalizing params for them and store this information into // finalizer db table. -async fn params_fetcher_loop(pool: PgPool, middleware: M2) -where +async fn params_fetcher_loop( + pool: PgPool, + middleware: M2, + zksync_contract: IZkSync, + l1_bridge: IL1Bridge, +) where + M1: Middleware, M2: ZksyncMiddleware, { loop { - if let Err(e) = params_fetcher_loop_iteration(&pool, &middleware).await { + if let Err(e) = + params_fetcher_loop_iteration(&pool, &middleware, &zksync_contract, &l1_bridge).await + { vlog::error!("params fetcher iteration ended with {e}"); tokio::time::sleep(LOOP_ITERATION_ERROR_BACKOFF).await; } } } -async fn params_fetcher_loop_iteration(pool: &PgPool, middleware: M2) -> Result<()> +async fn params_fetcher_loop_iteration( + pool: &PgPool, + middleware: &M2, + zksync_contract: &IZkSync, + l1_bridge: &IL1Bridge, +) -> Result<()> where + M1: Middleware, M2: ZksyncMiddleware, { let newly_executed_withdrawals = storage::get_withdrawals_with_no_data(pool, 1000).await?; @@ -492,7 +514,7 @@ where return Ok(()); } - vlog::info!("newly executed withdrawals {newly_executed_withdrawals:?}"); + vlog::debug!("newly committed withdrawals {newly_executed_withdrawals:?}"); let hash_and_index_and_id: Vec<_> = newly_executed_withdrawals .iter() @@ -501,7 +523,13 @@ where let params = request_finalize_params(&middleware, &hash_and_index_and_id).await; + let already_finalized: Vec<_> = get_finalized_withdrawals(¶ms, zksync_contract, l1_bridge) + .await? + .into_iter() + .collect(); + storage::add_withdrawals_data(pool, ¶ms).await?; + storage::finalization_data_set_finalized_in_tx(pool, &already_finalized, H256::zero()).await?; Ok(()) } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index b0472f62..3142afc5 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -970,3 +970,13 @@ pub async fn delete_db_content(pool: &PgPool, delete_batch_size: usize) -> Resul Ok(()) } + +/// Delete all content from `finalization_data` table. +pub async fn delete_finalization_data_content( + pool: &PgPool, + delete_batch_size: usize, +) -> Result<()> { + wipe_finalization_data(pool, delete_batch_size).await?; + + Ok(()) +}