From c848f3cfaab5ac0960b46e95ad81ca6971258904 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Tue, 21 Nov 2023 11:59:04 +0400 Subject: [PATCH 1/2] implements the db macro for processor/src/multisigs/scanner.rs --- processor/src/multisigs/scanner.rs | 210 ++++++++++++++++------------- 1 file changed, 115 insertions(+), 95 deletions(-) diff --git a/processor/src/multisigs/scanner.rs b/processor/src/multisigs/scanner.rs index f25867e75..86b137e52 100644 --- a/processor/src/multisigs/scanner.rs +++ b/processor/src/multisigs/scanner.rs @@ -10,11 +10,13 @@ use ciphersuite::group::GroupEncoding; use frost::curve::Ciphersuite; use log::{info, debug, warn}; +use serai_db::create_db; use tokio::{ sync::{RwLockReadGuard, RwLockWriteGuard, RwLock, mpsc}, time::sleep, }; +use scale::Encode; use crate::{ Get, DbTxn, Db, networks::{Output, Transaction, EventualitiesTracker, Block, Network}, @@ -30,48 +32,61 @@ pub enum ScannerEvent { pub type ScannerEventChannel = mpsc::UnboundedReceiver>; -#[derive(Clone, Debug)] -struct ScannerDb(PhantomData, PhantomData); -impl ScannerDb { - fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"SCANNER", dst, key) +create_db!( + ScannerDb { + BlockKeyDb: (number: u64) -> Vec, + BlockNumberKeyDb: (id: Vec) -> u64, + KeysDb: () -> Vec, + SeenDb: (id: Vec) -> Vec, + OutputsDb: (block: Vec) -> Vec, + ScannedBlocksDb: () -> u64, + RetirementBlocksDb: (key: Vec) -> u64 + } +); + +impl BlockKeyDb { + + fn save_block(txn: &mut impl DbTxn, number: usize, id: &>::Id) { + Self::set( + txn, + u64::try_from(number).unwrap(), + &BlockNumberKeyDb::to_block_number_key::(id) + ); + BlockNumberKeyDb::set( + txn, + BlockNumberKeyDb::to_block_number_key::(id), + &u64::try_from(number).unwrap() + ); } - fn block_key(number: usize) -> Vec { - Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes()) - } - fn block_number_key(id: &>::Id) -> Vec { - Self::scanner_key(b"block_number", id) - } - fn save_block(txn: &mut D::Transaction<'_>, number: usize, id: &>::Id) { - txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes()); - txn.put(Self::block_key(number), id); - } - fn block(getter: &G, number: usize) -> Option<>::Id> { - getter.get(Self::block_key(number)).map(|id| { + fn block(getter: &impl Get, number: usize) -> Option<>::Id> { + Self::get(getter, number.try_into().unwrap()).map(|bytes| { let mut res = >::Id::default(); - res.as_mut().copy_from_slice(&id); + res.as_mut().copy_from_slice(&bytes); res }) } - fn block_number(getter: &G, id: &>::Id) -> Option { - getter - .get(Self::block_number_key(id)) - .map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap()) +} + +impl BlockNumberKeyDb { + fn to_block_number_key(id: &>::Id) -> Vec { + id.as_ref().into() } - fn keys_key() -> Vec { - Self::scanner_key(b"keys", b"") + fn block_number(getter: &impl Get, id: &>::Id) -> Option { + let key = Self::to_block_number_key::(id); + Self::get(getter, key).map(|number| usize::try_from(number).unwrap()) } - fn register_key( - txn: &mut D::Transaction<'_>, +} + +impl KeysDb { + fn register_key( + txn: &mut impl DbTxn, activation_number: usize, key: ::G, ) { - let mut keys = txn.get(Self::keys_key()).unwrap_or(vec![]); - + let mut keys = Self::get(txn).unwrap_or_default(); let key_bytes = key.to_bytes(); - let key_len = key_bytes.as_ref().len(); assert_eq!(keys.len() % (8 + key_len), 0); @@ -86,10 +101,11 @@ impl ScannerDb { keys.extend(u64::try_from(activation_number).unwrap().to_le_bytes()); keys.extend(key_bytes.as_ref()); - txn.put(Self::keys_key(), keys); + Self::set(txn, &keys); } - fn keys(getter: &G) -> Vec<(usize, ::G)> { - let bytes_vec = getter.get(Self::keys_key()).unwrap_or(vec![]); + + fn keys(getter: &impl Get) -> Vec<(usize, ::G)> { + let bytes_vec = Self::get(getter).unwrap_or_default(); let mut bytes: &[u8] = bytes_vec.as_ref(); // Assumes keys will be 32 bytes when calculating the capacity @@ -100,31 +116,36 @@ impl ScannerDb { while !bytes.is_empty() { let mut activation_number = [0; 8]; bytes.read_exact(&mut activation_number).unwrap(); - let activation_number = u64::from_le_bytes(activation_number).try_into().unwrap(); - - res.push((activation_number, N::Curve::read_G(&mut bytes).unwrap())); + res.push((u64::from_le_bytes(activation_number).try_into().unwrap(), N::Curve::read_G(&mut bytes).unwrap())); } res } - fn retire_key(txn: &mut D::Transaction<'_>) { - let keys = Self::keys(txn); + + fn retire_key(txn: &mut impl DbTxn) { + let keys = Self::keys::(txn); assert_eq!(keys.len(), 2); - txn.del(Self::keys_key()); - Self::register_key(txn, keys[1].0, keys[1].1); + txn.del(Self::key()); + Self::register_key::(txn, keys[1].0, keys[1].1); } +} - fn seen_key(id: &>::Id) -> Vec { - Self::scanner_key(b"seen", id) +impl SeenDb { + fn to_seen_key(id: &>::Id) -> Vec { + id.as_ref().into() } - fn seen(getter: &G, id: &>::Id) -> bool { - getter.get(Self::seen_key(id)).is_some() + + fn seen(getter: &impl Get, id: &>::Id) -> bool { + Self::get(getter, Self::to_seen_key::(id)).is_some() } +} - fn outputs_key(block: &>::Id) -> Vec { - Self::scanner_key(b"outputs", block.as_ref()) +impl OutputsDb { + fn to_outputs_key(block: &>::Id) -> Vec { + block.as_ref().into() } - fn save_outputs( - txn: &mut D::Transaction<'_>, + + fn save_outputs( + txn: &mut impl DbTxn, block: &>::Id, outputs: &[N::Output], ) { @@ -132,13 +153,14 @@ impl ScannerDb { for output in outputs { output.write(&mut bytes).unwrap(); } - txn.put(Self::outputs_key(block), bytes); + Self::set(txn, Self::to_outputs_key::(block), &bytes); } - fn outputs( - txn: &D::Transaction<'_>, + + fn outputs( + txn: &impl DbTxn, block: &>::Id, ) -> Option> { - let bytes_vec = txn.get(Self::outputs_key(block))?; + let bytes_vec = Self::get(txn, Self::to_outputs_key::(block))?; let mut bytes: &[u8] = bytes_vec.as_ref(); let mut res = vec![]; @@ -147,46 +169,44 @@ impl ScannerDb { } Some(res) } +} - fn scanned_block_key() -> Vec { - Self::scanner_key(b"scanned_block", []) - } - - fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec { - let id = Self::block(txn, block); // It may be None for the first key rotated to - let outputs = - if let Some(id) = id.as_ref() { Self::outputs(txn, id).unwrap_or(vec![]) } else { vec![] }; - +impl ScannedBlocksDb { + fn save_scanned_block(txn: &mut impl DbTxn, block: usize) -> Vec { + let id = BlockKeyDb::block::(txn, block); + let outputs = id.as_ref().and_then(|id| OutputsDb::outputs::(txn, id)).unwrap_or_default(); + // Mark all the outputs from this block as seen for output in &outputs { - txn.put(Self::seen_key(&output.id()), b""); + SeenDb::set(txn, SeenDb::to_seen_key::(&output.id()), b""); } - - txn.put(Self::scanned_block_key(), u64::try_from(block).unwrap().to_le_bytes()); + Self::set(txn, &u64::try_from(block).unwrap()); // Return this block's outputs so they can be pruned from the RAM cache outputs } - fn latest_scanned_block(getter: &G) -> Option { - getter - .get(Self::scanned_block_key()) - .map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()) + + fn latest_scanned_block(getter: &impl Get) -> Option { + Self::get(getter) + .map(|number| usize::try_from(number).unwrap()) } +} - fn retirement_block_key(key: &::G) -> Vec { - Self::scanner_key(b"retirement_block", key.to_bytes()) +impl RetirementBlocksDb { + fn to_retirement_block_key(key: &::G) -> Vec { + key.to_bytes().as_ref().to_vec() } - fn save_retirement_block( - txn: &mut D::Transaction<'_>, + + fn save_retirement_block( + txn: &mut impl DbTxn, key: &::G, block: usize, ) { - txn.put(Self::retirement_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); + Self::set(txn, Self::to_retirement_block_key::(key), &u64::try_from(block).unwrap()); } - fn retirement_block(getter: &G, key: &::G) -> Option { - getter - .get(Self::retirement_block_key(key)) - .map(|bytes| usize::try_from(u64::from_le_bytes(bytes.try_into().unwrap())).unwrap()) + + fn retirement_block(getter: &impl Get, key: &::G) -> Option { + Self::get(getter, Self::to_retirement_block_key::(key)).map(|number| usize::try_from(number).unwrap()) } } @@ -280,10 +300,10 @@ impl ScannerHandle { if scanner.keys.is_empty() { assert!(scanner.ram_scanned.is_none()); scanner.ram_scanned = Some(activation_number); - assert!(ScannerDb::::save_scanned_block(txn, activation_number).is_empty()); + assert!(ScannedBlocksDb::save_scanned_block::(txn, activation_number).is_empty()); } - ScannerDb::::register_key(txn, activation_number, key); + KeysDb::register_key::(txn, activation_number, key); scanner.keys.push((activation_number, key)); #[cfg(not(test))] // TODO: A test violates this. Improve the test with a better flow assert!(scanner.keys.len() <= 2); @@ -292,14 +312,14 @@ impl ScannerHandle { } pub fn db_scanned(getter: &G) -> Option { - ScannerDb::::latest_scanned_block(getter) + ScannedBlocksDb::latest_scanned_block(getter) } // This perform a database read which isn't safe with regards to if the value is set or not // It may be set, when it isn't expected to be set, or not set, when it is expected to be set // Since the value is static, if it's set, it's correctly set pub fn block_number(getter: &G, id: &>::Id) -> Option { - ScannerDb::::block_number(getter, id) + BlockNumberKeyDb::block_number::(getter, id) } /// Acknowledge having handled a block. @@ -318,11 +338,11 @@ impl ScannerHandle { let mut scanner = self.scanner.long_term_acquire().await; // Get the number for this block - let number = ScannerDb::::block_number(txn, &id) + let number = BlockNumberKeyDb::block_number::(txn, &id) .expect("main loop trying to operate on data we haven't scanned"); log::trace!("block {} was {number}", hex::encode(&id)); - let outputs = ScannerDb::::save_scanned_block(txn, number); + let outputs = ScannedBlocksDb::save_scanned_block::(txn, number); // This has a race condition if we try to ack a block we scanned on a prior boot, and we have // yet to scan it on this boot assert!(number <= scanner.ram_scanned.unwrap()); @@ -335,10 +355,10 @@ impl ScannerHandle { self.held_scanner = Some(scanner); // Load the key from the DB, as it will have already been removed from RAM if retired - let key = ScannerDb::::keys(txn)[0].1; - let is_retirement_block = ScannerDb::::retirement_block(txn, &key) == Some(number); + let key = KeysDb::keys::(txn)[0].1; + let is_retirement_block = RetirementBlocksDb::retirement_block::(txn, &key) == Some(number); if is_retirement_block { - ScannerDb::::retire_key(txn); + KeysDb::retire_key::(txn); } (is_retirement_block, outputs) } @@ -378,13 +398,13 @@ impl Scanner { let (events_send, events_recv) = mpsc::unbounded_channel(); let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel(); - let keys = ScannerDb::::keys(&db); + let keys = KeysDb::keys::(&db); let mut eventualities = HashMap::new(); for key in &keys { eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new()); } - let ram_scanned = ScannerDb::::latest_scanned_block(&db); + let ram_scanned = ScannedBlocksDb::latest_scanned_block(&db); let scanner = ScannerHold { scanner: Arc::new(RwLock::new(Some(Scanner { @@ -510,13 +530,13 @@ impl Scanner { // These DB calls are safe, despite not having a txn, since they're static values // There's no issue if they're written in advance of expected (such as on reboot) // They're also only expected here - if let Some(id) = ScannerDb::::block(&db, block_being_scanned) { + if let Some(id) = BlockKeyDb::block::(&db, block_being_scanned) { if id != block_id { panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id)); } } else { // TODO: Move this to an unwrap - if let Some(id) = ScannerDb::::block(&db, block_being_scanned.saturating_sub(1)) { + if let Some(id) = BlockKeyDb::block::(&db, block_being_scanned.saturating_sub(1)) { if id != block.parent() { panic!( "block {} doesn't build off expected parent {}", @@ -527,7 +547,7 @@ impl Scanner { } let mut txn = db.txn(); - ScannerDb::::save_block(&mut txn, block_being_scanned, &block_id); + BlockKeyDb::save_block::(&mut txn, block_being_scanned, &block_id); txn.commit(); } @@ -617,7 +637,7 @@ impl Scanner { TODO2: Only update ram_outputs after committing the TXN in question. */ - let seen = ScannerDb::::seen(&db, &id); + let seen = SeenDb::seen::(&db, &id); let id = id.as_ref().to_vec(); if seen || scanner.ram_outputs.contains(&id) { panic!("scanned an output multiple times"); @@ -644,9 +664,9 @@ impl Scanner { if completed { let mut txn = db.txn(); // The retiring key is the earliest one still around - let retiring_key = ScannerDb::::keys(&txn)[0].1; + let retiring_key = KeysDb::keys::(&txn)[0].1; // This value is static w.r.t. the key - ScannerDb::::save_retirement_block( + RetirementBlocksDb::save_retirement_block::( &mut txn, &retiring_key, block_number + N::CONFIRMATIONS, @@ -679,11 +699,11 @@ impl Scanner { // - There's outputs // as only those blocks are meaningful and warrant obtaining synchrony over let is_retirement_block = - ScannerDb::::retirement_block(&db, &scanner.keys[0].1) == Some(block_being_scanned); + RetirementBlocksDb::retirement_block::(&db, &scanner.keys[0].1) == Some(block_being_scanned); let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) { // Save the outputs to disk let mut txn = db.txn(); - ScannerDb::::save_outputs(&mut txn, &block_id, &outputs); + OutputsDb::save_outputs::(&mut txn, &block_id, &outputs); txn.commit(); // Send all outputs From 8ddbcdac4ba2b7aed0442a31b80c84c9e1fe5d02 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Tue, 21 Nov 2023 12:28:45 +0400 Subject: [PATCH 2/2] ran fmt --- processor/src/multisigs/scanner.rs | 32 ++++++++++++++++-------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/processor/src/multisigs/scanner.rs b/processor/src/multisigs/scanner.rs index 86b137e52..2a215d0e5 100644 --- a/processor/src/multisigs/scanner.rs +++ b/processor/src/multisigs/scanner.rs @@ -45,17 +45,12 @@ create_db!( ); impl BlockKeyDb { - fn save_block(txn: &mut impl DbTxn, number: usize, id: &>::Id) { - Self::set( - txn, - u64::try_from(number).unwrap(), - &BlockNumberKeyDb::to_block_number_key::(id) - ); + Self::set(txn, u64::try_from(number).unwrap(), &BlockNumberKeyDb::to_block_number_key::(id)); BlockNumberKeyDb::set( txn, BlockNumberKeyDb::to_block_number_key::(id), - &u64::try_from(number).unwrap() + &u64::try_from(number).unwrap(), ); } @@ -75,7 +70,7 @@ impl BlockNumberKeyDb { fn block_number(getter: &impl Get, id: &>::Id) -> Option { let key = Self::to_block_number_key::(id); - Self::get(getter, key).map(|number| usize::try_from(number).unwrap()) + Self::get(getter, key).map(|number| usize::try_from(number).unwrap()) } } @@ -116,7 +111,10 @@ impl KeysDb { while !bytes.is_empty() { let mut activation_number = [0; 8]; bytes.read_exact(&mut activation_number).unwrap(); - res.push((u64::from_le_bytes(activation_number).try_into().unwrap(), N::Curve::read_G(&mut bytes).unwrap())); + res.push(( + u64::from_le_bytes(activation_number).try_into().unwrap(), + N::Curve::read_G(&mut bytes).unwrap(), + )); } res } @@ -175,7 +173,7 @@ impl ScannedBlocksDb { fn save_scanned_block(txn: &mut impl DbTxn, block: usize) -> Vec { let id = BlockKeyDb::block::(txn, block); let outputs = id.as_ref().and_then(|id| OutputsDb::outputs::(txn, id)).unwrap_or_default(); - + // Mark all the outputs from this block as seen for output in &outputs { SeenDb::set(txn, SeenDb::to_seen_key::(&output.id()), b""); @@ -187,8 +185,7 @@ impl ScannedBlocksDb { } fn latest_scanned_block(getter: &impl Get) -> Option { - Self::get(getter) - .map(|number| usize::try_from(number).unwrap()) + Self::get(getter).map(|number| usize::try_from(number).unwrap()) } } @@ -205,8 +202,12 @@ impl RetirementBlocksDb { Self::set(txn, Self::to_retirement_block_key::(key), &u64::try_from(block).unwrap()); } - fn retirement_block(getter: &impl Get, key: &::G) -> Option { - Self::get(getter, Self::to_retirement_block_key::(key)).map(|number| usize::try_from(number).unwrap()) + fn retirement_block( + getter: &impl Get, + key: &::G, + ) -> Option { + Self::get(getter, Self::to_retirement_block_key::(key)) + .map(|number| usize::try_from(number).unwrap()) } } @@ -699,7 +700,8 @@ impl Scanner { // - There's outputs // as only those blocks are meaningful and warrant obtaining synchrony over let is_retirement_block = - RetirementBlocksDb::retirement_block::(&db, &scanner.keys[0].1) == Some(block_being_scanned); + RetirementBlocksDb::retirement_block::(&db, &scanner.keys[0].1) == + Some(block_being_scanned); let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) { // Save the outputs to disk let mut txn = db.txn();