From 8f7338659fa8124fff783b000a6930abfe4dd89c Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Thu, 16 Nov 2023 15:45:21 +0400 Subject: [PATCH] processor/db.rs macro implimentation --- processor/src/db.rs | 50 +++++++++---------------------------------- processor/src/main.rs | 23 +++++++++++--------- 2 files changed, 23 insertions(+), 50 deletions(-) diff --git a/processor/src/db.rs b/processor/src/db.rs index 212341d0c..5be9c6021 100644 --- a/processor/src/db.rs +++ b/processor/src/db.rs @@ -1,4 +1,3 @@ -use core::marker::PhantomData; use std::io::Read; use scale::{Encode, Decode}; @@ -8,44 +7,18 @@ pub use serai_db::*; use crate::networks::{Block, Network}; -#[derive(Debug)] -pub struct MainDb(D, PhantomData); -impl MainDb { - pub fn new(db: D) -> Self { - Self(db, PhantomData) +create_db!( + MainDb { + HandledMessageDb: (id: u64) -> Vec, + PendingActivationsDb: () -> Vec } +); - fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"MAIN", dst, key) - } - - fn handled_key(id: u64) -> Vec { - Self::main_key(b"handled", id.to_le_bytes()) - } - pub fn handled_message(&self, id: u64) -> bool { - self.0.get(Self::handled_key(id)).is_some() - } - pub fn handle_message(txn: &mut D::Transaction<'_>, id: u64) { - txn.put(Self::handled_key(id), []) - } - - fn pending_activation_key() -> Vec { - Self::main_key(b"pending_activation", []) - } - pub fn set_pending_activation( - txn: &mut D::Transaction<'_>, - block_before_queue_block: >::Id, - set: ValidatorSet, - key_pair: KeyPair, - ) { - let mut buf = (set, key_pair).encode(); - buf.extend(block_before_queue_block.as_ref()); - txn.put(Self::pending_activation_key(), buf); - } - pub fn pending_activation( - getter: &G, +impl PendingActivationsDb { + pub fn pending_activation( + getter: &impl Get, ) -> Option<(>::Id, ValidatorSet, KeyPair)> { - if let Some(bytes) = getter.get(Self::pending_activation_key()) { + if let Some(bytes) = Self::get(getter) { if !bytes.is_empty() { let mut slice = bytes.as_slice(); let (set, key_pair) = <(ValidatorSet, KeyPair)>::decode(&mut slice).unwrap(); @@ -57,7 +30,4 @@ impl MainDb { } None } - pub fn clear_pending_activation(txn: &mut D::Transaction<'_>) { - txn.put(Self::pending_activation_key(), []); - } -} +} \ No newline at end of file diff --git a/processor/src/main.rs b/processor/src/main.rs index abf531654..4d5f73411 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -20,6 +20,8 @@ use messages::{ CoordinatorMessage, }; +use scale::Encode; + use serai_env as env; use message_queue::{Service, client::MessageQueue}; @@ -352,7 +354,9 @@ async fn handle_coordinator_msg( // We can't set these keys for activation until we know their queue block, which we // won't until the next Batch is confirmed // Set this variable so when we get the next Batch event, we can handle it - MainDb::::set_pending_activation(txn, block_before_queue_block, set, key_pair); + let mut buf = (set, key_pair).encode(); + buf.extend(block_before_queue_block.as_ref()); + PendingActivationsDb::set(txn, &buf); } } @@ -365,7 +369,7 @@ async fn handle_coordinator_msg( } => { assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network"); - if let Some((block, set, key_pair)) = MainDb::::pending_activation(txn) { + if let Some((block, set, key_pair)) = PendingActivationsDb::pending_activation::(txn) { // Only run if this is a Batch belonging to a distinct block if context.network_latest_finalized_block.as_ref() != block.as_ref() { let mut queue_block = >::Id::default(); @@ -387,8 +391,8 @@ async fn handle_coordinator_msg( activation_number, ) .await; - - MainDb::::clear_pending_activation(txn); + //clear pending activation + PendingActivationsDb::set(txn, &vec![] as &Vec); } } @@ -445,7 +449,7 @@ async fn boot( raw_db: &mut D, network: &N, coordinator: &mut Co, -) -> (MainDb, TributaryMutable, SubstrateMutable) { +) -> (D, TributaryMutable, SubstrateMutable) { let mut entropy_transcript = { let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified")); if entropy.len() != 64 { @@ -488,8 +492,6 @@ async fn boot( let mut substrate_signer = None; let mut signers = HashMap::new(); - let main_db = MainDb::::new(raw_db.clone()); - for (i, key) in current_keys.iter().enumerate() { let Some((substrate_keys, network_keys)) = key_gen.keys(key) else { continue }; let network_key = network_keys[0].group_key(); @@ -536,7 +538,7 @@ async fn boot( tokio::spawn(Signer::::rebroadcast_task(raw_db.clone(), network.clone())); ( - main_db, + raw_db.clone(), TributaryMutable { key_gen, substrate_signer, cosigner: None, signers }, multisig_manager, ) @@ -572,9 +574,10 @@ async fn run(mut raw_db: D, network: N, mut assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1)); last_coordinator_msg = Some(msg.id); + // Only handle this if we haven't already - if !main_db.handled_message(msg.id) { - MainDb::::handle_message(&mut txn, msg.id); + if HandledMessageDb::get(&main_db, msg.id).is_none() { + HandledMessageDb::set(&mut txn, msg.id, &vec![] as &Vec); // This is isolated to better think about how its ordered, or rather, about how the other // cases aren't ordered