Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements the db macro for processor/src/multisigs/scanner.rs #447

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 116 additions & 94 deletions processor/src/multisigs/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use ciphersuite::group::GroupEncoding;
use frost::curve::Ciphersuite;

use log::{info, debug, warn};
use serai_db::create_db;
use tokio::{
sync::{RwLockReadGuard, RwLockWriteGuard, RwLock, mpsc},
time::sleep,
};

use scale::Encode;
use crate::{
Get, DbTxn, Db,
networks::{Output, Transaction, EventualitiesTracker, Block, Network},
Expand All @@ -30,48 +32,56 @@ pub enum ScannerEvent<N: Network> {

pub type ScannerEventChannel<N> = mpsc::UnboundedReceiver<ScannerEvent<N>>;

#[derive(Clone, Debug)]
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
impl<N: Network, D: Db> ScannerDb<N, D> {
fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"SCANNER", dst, key)
create_db!(
ScannerDb {
BlockKeyDb: (number: u64) -> Vec<u8>,
BlockNumberKeyDb: (id: Vec<u8>) -> u64,
KeysDb: () -> Vec<u8>,
SeenDb: (id: Vec<u8>) -> Vec<u8>,
OutputsDb: (block: Vec<u8>) -> Vec<u8>,
ScannedBlocksDb: () -> u64,
RetirementBlocksDb: (key: Vec<u8>) -> u64
}
);

impl BlockKeyDb {
fn save_block<N: Network>(txn: &mut impl DbTxn, number: usize, id: &<N::Block as Block<N>>::Id) {
Self::set(txn, u64::try_from(number).unwrap(), &BlockNumberKeyDb::to_block_number_key::<N>(id));
BlockNumberKeyDb::set(
txn,
BlockNumberKeyDb::to_block_number_key::<N>(id),
&u64::try_from(number).unwrap(),
);
}

fn block_key(number: usize) -> Vec<u8> {
Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes())
}
fn block_number_key(id: &<N::Block as Block<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"block_number", id)
}
fn save_block(txn: &mut D::Transaction<'_>, number: usize, id: &<N::Block as Block<N>>::Id) {
txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes());
txn.put(Self::block_key(number), id);
}
fn block<G: Get>(getter: &G, number: usize) -> Option<<N::Block as Block<N>>::Id> {
getter.get(Self::block_key(number)).map(|id| {
fn block<N: Network>(getter: &impl Get, number: usize) -> Option<<N::Block as Block<N>>::Id> {
Self::get(getter, number.try_into().unwrap()).map(|bytes| {
let mut res = <N::Block as Block<N>>::Id::default();
res.as_mut().copy_from_slice(&id);
res.as_mut().copy_from_slice(&bytes);
res
})
}
fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
getter
.get(Self::block_number_key(id))
.map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap())
}

impl BlockNumberKeyDb {
fn to_block_number_key<N: Network>(id: &<N::Block as Block<N>>::Id) -> Vec<u8> {
id.as_ref().into()
}

fn keys_key() -> Vec<u8> {
Self::scanner_key(b"keys", b"")
fn block_number<N: Network>(getter: &impl Get, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
let key = Self::to_block_number_key::<N>(id);
Self::get(getter, key).map(|number| usize::try_from(number).unwrap())
}
fn register_key(
txn: &mut D::Transaction<'_>,
}

impl KeysDb {
fn register_key<N: Network>(
txn: &mut impl DbTxn,
activation_number: usize,
key: <N::Curve as Ciphersuite>::G,
) {
let mut keys = txn.get(Self::keys_key()).unwrap_or(vec![]);

let mut keys = Self::get(txn).unwrap_or_default();
let key_bytes = key.to_bytes();

let key_len = key_bytes.as_ref().len();
assert_eq!(keys.len() % (8 + key_len), 0);

Expand All @@ -86,10 +96,11 @@ impl<N: Network, D: Db> ScannerDb<N, D> {

keys.extend(u64::try_from(activation_number).unwrap().to_le_bytes());
keys.extend(key_bytes.as_ref());
txn.put(Self::keys_key(), keys);
Self::set(txn, &keys);
}
fn keys<G: Get>(getter: &G) -> Vec<(usize, <N::Curve as Ciphersuite>::G)> {
let bytes_vec = getter.get(Self::keys_key()).unwrap_or(vec![]);

fn keys<N: Network>(getter: &impl Get) -> Vec<(usize, <N::Curve as Ciphersuite>::G)> {
let bytes_vec = Self::get(getter).unwrap_or_default();
let mut bytes: &[u8] = bytes_vec.as_ref();

// Assumes keys will be 32 bytes when calculating the capacity
Expand All @@ -100,45 +111,54 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
while !bytes.is_empty() {
let mut activation_number = [0; 8];
bytes.read_exact(&mut activation_number).unwrap();
let activation_number = u64::from_le_bytes(activation_number).try_into().unwrap();

res.push((activation_number, N::Curve::read_G(&mut bytes).unwrap()));
res.push((
u64::from_le_bytes(activation_number).try_into().unwrap(),
N::Curve::read_G(&mut bytes).unwrap(),
));
}
res
}
fn retire_key(txn: &mut D::Transaction<'_>) {
let keys = Self::keys(txn);

fn retire_key<N: Network>(txn: &mut impl DbTxn) {
let keys = Self::keys::<N>(txn);
assert_eq!(keys.len(), 2);
txn.del(Self::keys_key());
Self::register_key(txn, keys[1].0, keys[1].1);
txn.del(Self::key());
Self::register_key::<N>(txn, keys[1].0, keys[1].1);
}
}

fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"seen", id)
impl SeenDb {
fn to_seen_key<N: Network>(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
id.as_ref().into()
}
fn seen<G: Get>(getter: &G, id: &<N::Output as Output<N>>::Id) -> bool {
getter.get(Self::seen_key(id)).is_some()

fn seen<N: Network>(getter: &impl Get, id: &<N::Output as Output<N>>::Id) -> bool {
Self::get(getter, Self::to_seen_key::<N>(id)).is_some()
}
}

fn outputs_key(block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"outputs", block.as_ref())
impl OutputsDb {
fn to_outputs_key<N: Network>(block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
block.as_ref().into()
}
fn save_outputs(
txn: &mut D::Transaction<'_>,

fn save_outputs<N: Network>(
txn: &mut impl DbTxn,
block: &<N::Block as Block<N>>::Id,
outputs: &[N::Output],
) {
let mut bytes = Vec::with_capacity(outputs.len() * 64);
for output in outputs {
output.write(&mut bytes).unwrap();
}
txn.put(Self::outputs_key(block), bytes);
Self::set(txn, Self::to_outputs_key::<N>(block), &bytes);
}
fn outputs(
txn: &D::Transaction<'_>,

fn outputs<N: Network>(
txn: &impl DbTxn,
block: &<N::Block as Block<N>>::Id,
) -> Option<Vec<N::Output>> {
let bytes_vec = txn.get(Self::outputs_key(block))?;
let bytes_vec = Self::get(txn, Self::to_outputs_key::<N>(block))?;
let mut bytes: &[u8] = bytes_vec.as_ref();

let mut res = vec![];
Expand All @@ -147,46 +167,47 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
}
Some(res)
}
}

fn scanned_block_key() -> Vec<u8> {
Self::scanner_key(b"scanned_block", [])
}

fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec<N::Output> {
let id = Self::block(txn, block); // It may be None for the first key rotated to
let outputs =
if let Some(id) = id.as_ref() { Self::outputs(txn, id).unwrap_or(vec![]) } else { vec![] };
impl ScannedBlocksDb {
fn save_scanned_block<N: Network>(txn: &mut impl DbTxn, block: usize) -> Vec<N::Output> {
let id = BlockKeyDb::block::<N>(txn, block);
let outputs = id.as_ref().and_then(|id| OutputsDb::outputs::<N>(txn, id)).unwrap_or_default();

// Mark all the outputs from this block as seen
for output in &outputs {
txn.put(Self::seen_key(&output.id()), b"");
SeenDb::set(txn, SeenDb::to_seen_key::<N>(&output.id()), b"");
}

txn.put(Self::scanned_block_key(), u64::try_from(block).unwrap().to_le_bytes());
Self::set(txn, &u64::try_from(block).unwrap());

// Return this block's outputs so they can be pruned from the RAM cache
outputs
}
fn latest_scanned_block<G: Get>(getter: &G) -> Option<usize> {
getter
.get(Self::scanned_block_key())
.map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap())

fn latest_scanned_block(getter: &impl Get) -> Option<usize> {
Self::get(getter).map(|number| usize::try_from(number).unwrap())
}
}

fn retirement_block_key(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> {
Self::scanner_key(b"retirement_block", key.to_bytes())
impl RetirementBlocksDb {
fn to_retirement_block_key<N: Network>(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> {
key.to_bytes().as_ref().to_vec()
}
fn save_retirement_block(
txn: &mut D::Transaction<'_>,

fn save_retirement_block<N: Network>(
txn: &mut impl DbTxn,
key: &<N::Curve as Ciphersuite>::G,
block: usize,
) {
txn.put(Self::retirement_block_key(key), u64::try_from(block).unwrap().to_le_bytes());
Self::set(txn, Self::to_retirement_block_key::<N>(key), &u64::try_from(block).unwrap());
}
fn retirement_block<G: Get>(getter: &G, key: &<N::Curve as Ciphersuite>::G) -> Option<usize> {
getter
.get(Self::retirement_block_key(key))
.map(|bytes| usize::try_from(u64::from_le_bytes(bytes.try_into().unwrap())).unwrap())

fn retirement_block<N: Network>(
getter: &impl Get,
key: &<N::Curve as Ciphersuite>::G,
) -> Option<usize> {
Self::get(getter, Self::to_retirement_block_key::<N>(key))
.map(|number| usize::try_from(number).unwrap())
}
}

Expand Down Expand Up @@ -280,10 +301,10 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
if scanner.keys.is_empty() {
assert!(scanner.ram_scanned.is_none());
scanner.ram_scanned = Some(activation_number);
assert!(ScannerDb::<N, D>::save_scanned_block(txn, activation_number).is_empty());
assert!(ScannedBlocksDb::save_scanned_block::<N>(txn, activation_number).is_empty());
}

ScannerDb::<N, D>::register_key(txn, activation_number, key);
KeysDb::register_key::<N>(txn, activation_number, key);
scanner.keys.push((activation_number, key));
#[cfg(not(test))] // TODO: A test violates this. Improve the test with a better flow
assert!(scanner.keys.len() <= 2);
Expand All @@ -292,14 +313,14 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
}

pub fn db_scanned<G: Get>(getter: &G) -> Option<usize> {
ScannerDb::<N, D>::latest_scanned_block(getter)
ScannedBlocksDb::latest_scanned_block(getter)
}

// This perform a database read which isn't safe with regards to if the value is set or not
// It may be set, when it isn't expected to be set, or not set, when it is expected to be set
// Since the value is static, if it's set, it's correctly set
pub fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
ScannerDb::<N, D>::block_number(getter, id)
BlockNumberKeyDb::block_number::<N>(getter, id)
}

/// Acknowledge having handled a block.
Expand All @@ -318,11 +339,11 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
let mut scanner = self.scanner.long_term_acquire().await;

// Get the number for this block
let number = ScannerDb::<N, D>::block_number(txn, &id)
let number = BlockNumberKeyDb::block_number::<N>(txn, &id)
.expect("main loop trying to operate on data we haven't scanned");
log::trace!("block {} was {number}", hex::encode(&id));

let outputs = ScannerDb::<N, D>::save_scanned_block(txn, number);
let outputs = ScannedBlocksDb::save_scanned_block::<N>(txn, number);
// This has a race condition if we try to ack a block we scanned on a prior boot, and we have
// yet to scan it on this boot
assert!(number <= scanner.ram_scanned.unwrap());
Expand All @@ -335,10 +356,10 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
self.held_scanner = Some(scanner);

// Load the key from the DB, as it will have already been removed from RAM if retired
let key = ScannerDb::<N, D>::keys(txn)[0].1;
let is_retirement_block = ScannerDb::<N, D>::retirement_block(txn, &key) == Some(number);
let key = KeysDb::keys::<N>(txn)[0].1;
let is_retirement_block = RetirementBlocksDb::retirement_block::<N>(txn, &key) == Some(number);
if is_retirement_block {
ScannerDb::<N, D>::retire_key(txn);
KeysDb::retire_key::<N>(txn);
}
(is_retirement_block, outputs)
}
Expand Down Expand Up @@ -378,13 +399,13 @@ impl<N: Network, D: Db> Scanner<N, D> {
let (events_send, events_recv) = mpsc::unbounded_channel();
let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel();

let keys = ScannerDb::<N, D>::keys(&db);
let keys = KeysDb::keys::<N>(&db);
let mut eventualities = HashMap::new();
for key in &keys {
eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
}

let ram_scanned = ScannerDb::<N, D>::latest_scanned_block(&db);
let ram_scanned = ScannedBlocksDb::latest_scanned_block(&db);

let scanner = ScannerHold {
scanner: Arc::new(RwLock::new(Some(Scanner {
Expand Down Expand Up @@ -510,13 +531,13 @@ impl<N: Network, D: Db> Scanner<N, D> {
// These DB calls are safe, despite not having a txn, since they're static values
// There's no issue if they're written in advance of expected (such as on reboot)
// They're also only expected here
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned) {
if let Some(id) = BlockKeyDb::block::<N>(&db, block_being_scanned) {
if id != block_id {
panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id));
}
} else {
// TODO: Move this to an unwrap
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned.saturating_sub(1)) {
if let Some(id) = BlockKeyDb::block::<N>(&db, block_being_scanned.saturating_sub(1)) {
if id != block.parent() {
panic!(
"block {} doesn't build off expected parent {}",
Expand All @@ -527,7 +548,7 @@ impl<N: Network, D: Db> Scanner<N, D> {
}

let mut txn = db.txn();
ScannerDb::<N, D>::save_block(&mut txn, block_being_scanned, &block_id);
BlockKeyDb::save_block::<N>(&mut txn, block_being_scanned, &block_id);
txn.commit();
}

Expand Down Expand Up @@ -617,7 +638,7 @@ impl<N: Network, D: Db> Scanner<N, D> {

TODO2: Only update ram_outputs after committing the TXN in question.
*/
let seen = ScannerDb::<N, D>::seen(&db, &id);
let seen = SeenDb::seen::<N>(&db, &id);
let id = id.as_ref().to_vec();
if seen || scanner.ram_outputs.contains(&id) {
panic!("scanned an output multiple times");
Expand All @@ -644,9 +665,9 @@ impl<N: Network, D: Db> Scanner<N, D> {
if completed {
let mut txn = db.txn();
// The retiring key is the earliest one still around
let retiring_key = ScannerDb::<N, D>::keys(&txn)[0].1;
let retiring_key = KeysDb::keys::<N>(&txn)[0].1;
// This value is static w.r.t. the key
ScannerDb::<N, D>::save_retirement_block(
RetirementBlocksDb::save_retirement_block::<N>(
&mut txn,
&retiring_key,
block_number + N::CONFIRMATIONS,
Expand Down Expand Up @@ -679,11 +700,12 @@ impl<N: Network, D: Db> Scanner<N, D> {
// - There's outputs
// as only those blocks are meaningful and warrant obtaining synchrony over
let is_retirement_block =
ScannerDb::<N, D>::retirement_block(&db, &scanner.keys[0].1) == Some(block_being_scanned);
RetirementBlocksDb::retirement_block::<N>(&db, &scanner.keys[0].1) ==
Some(block_being_scanned);
let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) {
// Save the outputs to disk
let mut txn = db.txn();
ScannerDb::<N, D>::save_outputs(&mut txn, &block_id, &outputs);
OutputsDb::save_outputs::<N>(&mut txn, &block_id, &outputs);
txn.commit();

// Send all outputs
Expand Down