Skip to content

Commit

Permalink
processor/db.rs macro implimentation (#437)
Browse files Browse the repository at this point in the history
* processor/db.rs macro implimentation

* ran clippy and fmt

* incorporated recommendations

* used empty uple instead of [u8; 0]

* ran fmt
econsta authored Nov 22, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent a315040 commit 9ab2a2c
Showing 2 changed files with 36 additions and 48 deletions.
56 changes: 18 additions & 38 deletions processor/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::marker::PhantomData;
use std::io::Read;

use scale::{Encode, Decode};
@@ -8,44 +7,18 @@ pub use serai_db::*;

use crate::networks::{Block, Network};

#[derive(Debug)]
pub struct MainDb<N: Network, D: Db>(D, PhantomData<N>);
impl<N: Network, D: Db> MainDb<N, D> {
pub fn new(db: D) -> Self {
Self(db, PhantomData)
create_db!(
MainDb {
HandledMessageDb: (id: u64) -> (),
PendingActivationsDb: () -> Vec<u8>
}
);

fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"MAIN", dst, key)
}

fn handled_key(id: u64) -> Vec<u8> {
Self::main_key(b"handled", id.to_le_bytes())
}
pub fn handled_message(&self, id: u64) -> bool {
self.0.get(Self::handled_key(id)).is_some()
}
pub fn handle_message(txn: &mut D::Transaction<'_>, id: u64) {
txn.put(Self::handled_key(id), [])
}

fn pending_activation_key() -> Vec<u8> {
Self::main_key(b"pending_activation", [])
}
pub fn set_pending_activation(
txn: &mut D::Transaction<'_>,
block_before_queue_block: <N::Block as Block<N>>::Id,
set: ValidatorSet,
key_pair: KeyPair,
) {
let mut buf = (set, key_pair).encode();
buf.extend(block_before_queue_block.as_ref());
txn.put(Self::pending_activation_key(), buf);
}
pub fn pending_activation<G: Get>(
getter: &G,
impl PendingActivationsDb {
pub fn pending_activation<N: Network>(
getter: &impl Get,
) -> Option<(<N::Block as Block<N>>::Id, ValidatorSet, KeyPair)> {
if let Some(bytes) = getter.get(Self::pending_activation_key()) {
if let Some(bytes) = Self::get(getter) {
if !bytes.is_empty() {
let mut slice = bytes.as_slice();
let (set, key_pair) = <(ValidatorSet, KeyPair)>::decode(&mut slice).unwrap();
@@ -57,7 +30,14 @@ impl<N: Network, D: Db> MainDb<N, D> {
}
None
}
pub fn clear_pending_activation(txn: &mut D::Transaction<'_>) {
txn.put(Self::pending_activation_key(), []);
pub fn set_pending_activation<N: Network>(
txn: &mut impl DbTxn,
block_before_queue_block: <N::Block as Block<N>>::Id,
set: ValidatorSet,
key_pair: KeyPair,
) {
let mut buf = (set, key_pair).encode();
buf.extend(block_before_queue_block.as_ref());
Self::set(txn, &buf);
}
}
28 changes: 18 additions & 10 deletions processor/src/main.rs
Original file line number Diff line number Diff line change
@@ -352,7 +352,12 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
// We can't set these keys for activation until we know their queue block, which we
// won't until the next Batch is confirmed
// Set this variable so when we get the next Batch event, we can handle it
MainDb::<N, D>::set_pending_activation(txn, block_before_queue_block, set, key_pair);
PendingActivationsDb::set_pending_activation::<N>(
txn,
block_before_queue_block,
set,
key_pair,
);
}
}

@@ -365,7 +370,7 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
} => {
assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network");

if let Some((block, set, key_pair)) = MainDb::<N, D>::pending_activation(txn) {
if let Some((block, set, key_pair)) = PendingActivationsDb::pending_activation::<N>(txn) {
// Only run if this is a Batch belonging to a distinct block
if context.network_latest_finalized_block.as_ref() != block.as_ref() {
let mut queue_block = <N::Block as Block<N>>::Id::default();
@@ -387,8 +392,8 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
activation_number,
)
.await;

MainDb::<N, D>::clear_pending_activation(txn);
//clear pending activation
txn.del(PendingActivationsDb::key());
}
}

@@ -445,7 +450,7 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
raw_db: &mut D,
network: &N,
coordinator: &mut Co,
) -> (MainDb<N, D>, TributaryMutable<N, D>, SubstrateMutable<N, D>) {
) -> (D, TributaryMutable<N, D>, SubstrateMutable<N, D>) {
let mut entropy_transcript = {
let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified"));
if entropy.len() != 64 {
@@ -488,8 +493,6 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
let mut batch_signer = None;
let mut signers = HashMap::new();

let main_db = MainDb::<N, _>::new(raw_db.clone());

for (i, key) in current_keys.iter().enumerate() {
let Some((substrate_keys, network_keys)) = key_gen.keys(key) else { continue };
let network_key = network_keys[0].group_key();
@@ -535,7 +538,11 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
// This hedges against being dropped due to full mempools, temporarily too low of a fee...
tokio::spawn(Signer::<N, D>::rebroadcast_task(raw_db.clone(), network.clone()));

(main_db, TributaryMutable { key_gen, batch_signer, cosigner: None, signers }, multisig_manager)
(
raw_db.clone(),
TributaryMutable { key_gen, batch_signer, cosigner: None, signers },
multisig_manager,
)
}

#[allow(clippy::await_holding_lock)] // Needed for txn, unfortunately can't be down-scoped
@@ -568,9 +575,10 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1));
last_coordinator_msg = Some(msg.id);


// Only handle this if we haven't already
if !main_db.handled_message(msg.id) {
MainDb::<N, D>::handle_message(&mut txn, msg.id);
if HandledMessageDb::get(&main_db, msg.id).is_none() {
HandledMessageDb::set(&mut txn, msg.id, &());

// This is isolated to better think about how its ordered, or rather, about how the other
// cases aren't ordered

0 comments on commit 9ab2a2c

Please sign in to comment.