Skip to content

Commit

Permalink
Start moving Coordinator to a multi-Tributary model
Browse files Browse the repository at this point in the history
Prior, we only supported a single Tributary per network, and spawned a task to
handled Processor messages per Tributary. Now, we handle Processor messages per
network, yet we still only supported a single Tributary in that handling
function.

Now, when we handle a message, we load the Tributary which is relevant. Once we
know it, we ensure we have it (preventing race conditions), and then proceed.

We do need work to check if we should have a Tributary, or if we're not
participating. We also need to check if a Tributary has been retired, meaning
we shouldn't handle any transactions related to them, and to clean up retired
Tributaries.
  • Loading branch information
kayabaNerve committed Sep 28, 2023
1 parent 4a32f22 commit 7d738a3
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 242 deletions.
542 changes: 309 additions & 233 deletions coordinator/src/main.rs

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions coordinator/src/substrate/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use scale::{Encode, Decode};

pub use serai_db::*;

use serai_client::validator_sets::primitives::{Session, KeyPair};

#[derive(Debug)]
pub struct SubstrateDb<D: Db>(pub D);
impl<D: Db> SubstrateDb<D> {
Expand Down Expand Up @@ -33,4 +37,22 @@ impl<D: Db> SubstrateDb<D> {
assert!(!Self::handled_event(txn, id, index));
txn.put(Self::event_key(&id, index), []);
}

fn session_key(key: &[u8]) -> Vec<u8> {
Self::substrate_key(b"session", key)
}
pub fn session_for_key<G: Get>(getter: &G, key: &[u8]) -> Option<Session> {
getter.get(Self::session_key(key)).map(|bytes| Session::decode(&mut bytes.as_ref()).unwrap())
}
pub fn save_session_for_keys(txn: &mut D::Transaction<'_>, key_pair: &KeyPair, session: Session) {
let session = session.encode();
let key_0 = Self::session_key(&key_pair.0);
let existing = txn.get(&key_0);
// This may trigger if 100% of a DKG are malicious, and they create a key equivalent to a prior
// key. Since it requires 100% maliciousness, not just 67% maliciousness, this will only assert
// in a modified-to-be-malicious stack, making it safe
assert!(existing.is_none() || (existing.as_ref() == Some(&session)));
txn.put(key_0, session.clone());
txn.put(Self::session_key(&key_pair.1), session);
}
}
10 changes: 8 additions & 2 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ async fn handle_new_set<D: Db, CNT: Clone + Fn(&mut D, TributarySpec)>(
Ok(())
}

async fn handle_key_gen<Pro: Processors>(
async fn handle_key_gen<D: Db, Pro: Processors>(
db: &mut D,
processors: &Pro,
serai: &Serai,
block: &Block,
set: ValidatorSet,
key_pair: KeyPair,
) -> Result<(), SeraiError> {
// This has to be saved *before* we send ConfirmKeyPair
let mut txn = db.txn();
SubstrateDb::<D>::save_session_for_keys(&mut txn, &key_pair, set.session);
txn.commit();

processors
.send(
set.network,
Expand Down Expand Up @@ -254,7 +260,7 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
TributaryDb::<D>::set_key_pair(&mut txn, set, &key_pair);
txn.commit();

handle_key_gen(processors, serai, &block, set, key_pair).await?;
handle_key_gen(&mut db.0, processors, serai, &block, set, key_pair).await?;
} else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
Expand Down
6 changes: 4 additions & 2 deletions coordinator/src/tributary/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,12 @@ pub(crate) async fn handle_application_tx<
) {
Some(Some(preprocesses)) => {
NonceDecider::<D>::selected_for_signing_batch(txn, genesis, data.plan);
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
processors
.send(
spec.set().network,
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchPreprocesses {
id: SignId { key: vec![], id: data.plan, attempt: data.attempt },
id: SignId { key, id: data.plan, attempt: data.attempt },
preprocesses,
}),
)
Expand All @@ -480,11 +481,12 @@ pub(crate) async fn handle_application_tx<
&data.signed,
) {
Some(Some(shares)) => {
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
processors
.send(
spec.set().network,
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares {
id: SignId { key: vec![], id: data.plan, attempt: data.attempt },
id: SignId { key, id: data.plan, attempt: data.attempt },
shares: shares
.into_iter()
.map(|(validator, share)| (validator, share.try_into().unwrap()))
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/tributary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl TributarySpec {
let mut validators = vec![];
for (participant, amount) in set_data.participants {
// TODO: Ban invalid keys from being validators on the Serai side
// (make coordinator key a session key?)
let participant = <Ristretto as Ciphersuite>::read_G::<&[u8]>(&mut participant.0.as_ref())
.expect("invalid key registered as participant");
// Give one weight on Tributary per bond instance
Expand Down
5 changes: 2 additions & 3 deletions processor/src/substrate_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::{VecDeque, HashMap};
use rand_core::OsRng;

use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::group::GroupEncoding;
use frost::{
curve::Ristretto,
ThresholdKeys,
Expand Down Expand Up @@ -177,9 +178,7 @@ impl<D: Db> SubstrateSigner<D> {
// Update the attempt number
self.attempt.insert(id, attempt);

// Doesn't set key since there's only one key active at a time
// TODO: BatchSignId
let id = SignId { key: vec![], id, attempt };
let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt };
info!("signing batch {} #{}", hex::encode(id.id), id.attempt);

// If we reboot mid-sign, the current design has us abort all signs and wait for latter
Expand Down
6 changes: 5 additions & 1 deletion processor/src/tests/substrate_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ async fn test_substrate_signer() {

let id: u32 = 5;
let block = BlockHash([0xaa; 32]);
let mut actual_id = SignId { key: vec![], id: [0; 32], attempt: 0 };
let mut actual_id = SignId {
key: keys.values().next().unwrap().group_key().to_bytes().to_vec(),
id: [0; 32],
attempt: 0,
};

let batch = Batch {
network: NetworkId::Monero,
Expand Down
6 changes: 5 additions & 1 deletion tests/coordinator/src/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ pub async fn batch(
) -> u64 {
let mut id = [0; 32];
OsRng.fill_bytes(&mut id);
let id = SignId { key: vec![], id, attempt: 0 };
let id = SignId {
key: (<Ristretto as Ciphersuite>::generator() * **substrate_key).to_bytes().to_vec(),
id,
attempt: 0,
};

// Select a random participant to exclude, so we know for sure who *is* participating
assert_eq!(COORDINATORS - THRESHOLD, 1);
Expand Down

0 comments on commit 7d738a3

Please sign in to comment.