From c7e4236e5d685b50a883a0a23fb9fb73a4ea5d2d Mon Sep 17 00:00:00 2001 From: sunce86 Date: Mon, 4 Nov 2024 13:17:47 +0100 Subject: [PATCH] continue migration after filling gap --- src/database_solver_competition.rs | 12 +- src/run.rs | 248 ++++++++++++++++------------- 2 files changed, 141 insertions(+), 119 deletions(-) diff --git a/src/database_solver_competition.rs b/src/database_solver_competition.rs index be6a6fe..ad41601 100644 --- a/src/database_solver_competition.rs +++ b/src/database_solver_competition.rs @@ -83,6 +83,7 @@ pub struct RichSolverCompetition { /// Entries are fetched going from higher auction_id to lower auction_id. pub async fn fetch_batch( ex: &mut PgConnection, + auction_id: i64, batch_size: i64, ) -> Result, sqlx::Error> { const QUERY: &str = r#" @@ -94,12 +95,15 @@ pub async fn fetch_batch( FROM solver_competitions sc LEFT JOIN settlement_scores ss ON sc.id = ss.auction_id LEFT JOIN surplus_capturing_jit_order_owners jit ON sc.id = jit.auction_id - LEFT JOIN competition_auctions ca ON sc.id = ca.id - WHERE ca.id IS NULL + WHERE sc.id < $1 ORDER BY sc.id DESC - LIMIT $1;"#; + LIMIT $2;"#; - sqlx::query_as(QUERY).bind(batch_size).fetch_all(ex).await + sqlx::query_as(QUERY) + .bind(auction_id) + .bind(batch_size) + .fetch_all(ex) + .await } #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] diff --git a/src/run.rs b/src/run.rs index d38cf08..968a339 100644 --- a/src/run.rs +++ b/src/run.rs @@ -1,11 +1,11 @@ use crate::{ database::Postgres, - database_solver_competition::{fetch_batch, Auction, ByteArray, RichSolverCompetition}, + database_solver_competition::{fetch_batch, Auction, ByteArray}, solver_competition_api::SolverCompetitionDB, }; use anyhow::{Context, Result}; use clap::Parser; -use std::num::NonZero; +use std::{num::NonZero, ops::DerefMut}; pub async fn start(args: impl Iterator) { let args = crate::arguments::Arguments::parse_from(args); @@ -14,140 +14,62 @@ pub async fn start(args: impl Iterator) { .await .unwrap(); - fix_missing_historic_auctions(&db).await.unwrap(); + populate_historic_auctions(&db).await.unwrap(); // sleep for 10 minutes std::thread::sleep(std::time::Duration::from_secs(600)); } -// pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> { -// println!("starting data migration for auction data"); +pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> { + println!("starting data migration for auction data"); -// const BATCH_SIZE: i64 = 10; - -// let mut ex = db.pool.begin().await?; - -// // find entry in `competition_auctions` with the lowest auction_id, as a -// // starting point -// let current_auction_id: Option = -// sqlx::query_scalar::<_, Option>("SELECT MIN(id) FROM competition_auctions;") -// .fetch_one(ex.deref_mut()) -// .await -// .context("fetch lowest auction id")?; - -// let Some(mut current_auction_id) = current_auction_id else { -// println!("competition_auctions is empty, nothing to process"); -// return Ok(()); -// }; - -// let starting_auction_number = current_auction_id; - -// loop { -// // fetch the next batch of auctions -// let competitions: Vec = -// fetch_batch(&mut ex, BATCH_SIZE).await?; - -// if competitions.is_empty() { -// println!("no more auctions to process"); -// break; -// } - -// println!("processing {} auctions, first one: {}", competitions.len(), competitions.first().map(|c| c.id).unwrap_or(0)); - -// for solver_competition in &competitions { -// let competition: SolverCompetitionDB = -// serde_json::from_value(solver_competition.json.clone()) -// .context("deserialize SolverCompetitionDB")?; - -// // populate historic auctions -// let auction = Auction { -// id: solver_competition.id, -// block: i64::try_from(competition.auction_start_block).context("block overflow")?, -// deadline: solver_competition.deadline, -// order_uids: competition -// .auction -// .orders -// .iter() -// .map(|order| ByteArray(order.0)) -// .collect(), -// price_tokens: competition -// .auction -// .prices -// .keys() -// .map(|token| ByteArray(token.0)) -// .collect(), -// price_values: competition -// .auction -// .prices -// .values() -// .map(crate::database_solver_competition::u256_to_big_decimal) -// .collect(), -// surplus_capturing_jit_order_owners: solver_competition -// .surplus_capturing_jit_order_owners -// .clone(), -// }; - -// if let Err(err) = crate::database_solver_competition::save(&mut ex, auction).await { -// println!( -// "failed to save auction: {:?}, auction: {}", -// err, solver_competition.id -// ); -// } -// } - -// // commit each batch separately -// ex.commit().await?; - -// // sleep for 50ms -// std::thread::sleep(std::time::Duration::from_millis(50)); - -// ex = db.pool.begin().await?; - -// // update the current auction id -// current_auction_id = competitions.last().unwrap().id; -// } - -// Ok(()) -// } - -pub async fn fix_missing_historic_auctions(db: &Postgres) -> Result<()> { - println!("starting data migration fix for auction data"); - - const BATCH_SIZE: i64 = 10; + const BATCH_SIZE: i64 = 1; let mut ex = db.pool.begin().await?; - // there is a gap of entries in `competition_auctions` that need to be filled + // find entry in `competition_auctions` with the lowest auction_id, as a + // starting point + let current_auction_id: Option = + sqlx::query_scalar::<_, Option>("SELECT MIN(id) FROM competition_auctions;") + .fetch_one(ex.deref_mut()) + .await + .context("fetch lowest auction id")?; + + let Some(mut current_auction_id) = current_auction_id else { + println!("competition_auctions is empty, nothing to process"); + return Ok(()); + }; - // we identify this gap by looking at the `solver_competitions` table + let starting_auction_number = current_auction_id; loop { + println!( + "populating historic auctions from auction {}, executed in percent: {}", + current_auction_id, + (starting_auction_number - current_auction_id) as f64 / starting_auction_number as f64 + * 100.0 + ); + // fetch the next batch of auctions - let competitions: Vec = fetch_batch(&mut ex, BATCH_SIZE).await?; + let competitions = + fetch_batch(&mut ex, current_auction_id, BATCH_SIZE).await; + let Ok(competitions) = competitions else { + // added because auction 3278851 has null json - unexpected entry in the database + println!("failed to deserialize {}", current_auction_id); + continue; + }; if competitions.is_empty() { println!("no more auctions to process"); break; } - println!( - "processing {} auctions, first one {}", - competitions.len(), - competitions.last().map(|c| c.id).unwrap_or(0) - ); + println!("processing {} auctions", competitions.len()); for solver_competition in &competitions { - let competition = - serde_json::from_value::(solver_competition.json.clone()) - .context("deserialize SolverCompetitionDB"); - - let Ok(competition) = competition else { - println!( - "failed to deserialize SolverCompetitionDB, auction: {}", - solver_competition.id - ); - continue; - }; + let competition: SolverCompetitionDB = + serde_json::from_value(solver_competition.json.clone()) + .context("deserialize SolverCompetitionDB")?; // populate historic auctions let auction = Auction { @@ -192,7 +114,103 @@ pub async fn fix_missing_historic_auctions(db: &Postgres) -> Result<()> { std::thread::sleep(std::time::Duration::from_millis(50)); ex = db.pool.begin().await?; + + // update the current auction id + current_auction_id = competitions.last().unwrap().id; } Ok(()) } + +// pub async fn fix_missing_historic_auctions(db: &Postgres) -> Result<()> { +// println!("starting data migration fix for auction data"); + +// const BATCH_SIZE: i64 = 1; + +// let mut ex = db.pool.begin().await?; + +// // there is a gap of entries in `competition_auctions` that need to be filled + +// // we identify this gap by looking at the `solver_competitions` table + +// loop { +// // fetch the next batch of auctions +// let competitions = fetch_batch(&mut ex, BATCH_SIZE).await; +// let Ok(competitions) = competitions else { +// // added because auction 3278851 has null json - this is a one-off fix +// println!("failed to deserialize"); +// continue; +// }; + + +// if competitions.is_empty() { +// println!("no more auctions to process"); +// break; +// } + +// println!( +// "processing {} auctions, first one {}", +// competitions.len(), +// competitions.last().map(|c| c.id).unwrap_or(0) +// ); + +// for solver_competition in &competitions { +// let competition = +// serde_json::from_value::(solver_competition.json.clone()) +// .context("deserialize SolverCompetitionDB"); + +// let Ok(competition) = competition else { +// println!( +// "failed to deserialize SolverCompetitionDB, auction: {}", +// solver_competition.id +// ); +// continue; +// }; + +// // populate historic auctions +// let auction = Auction { +// id: solver_competition.id, +// block: i64::try_from(competition.auction_start_block).context("block overflow")?, +// deadline: solver_competition.deadline, +// order_uids: competition +// .auction +// .orders +// .iter() +// .map(|order| ByteArray(order.0)) +// .collect(), +// price_tokens: competition +// .auction +// .prices +// .keys() +// .map(|token| ByteArray(token.0)) +// .collect(), +// price_values: competition +// .auction +// .prices +// .values() +// .map(crate::database_solver_competition::u256_to_big_decimal) +// .collect(), +// surplus_capturing_jit_order_owners: solver_competition +// .surplus_capturing_jit_order_owners +// .clone(), +// }; + +// if let Err(err) = crate::database_solver_competition::save(&mut ex, auction).await { +// println!( +// "failed to save auction: {:?}, auction: {}", +// err, solver_competition.id +// ); +// } +// } + +// // commit each batch separately +// ex.commit().await?; + +// // sleep for 50ms +// std::thread::sleep(std::time::Duration::from_millis(50)); + +// ex = db.pool.begin().await?; +// } + +// Ok(()) +// }