diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 8c2723e78..8372e241a 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -238,6 +238,16 @@ impl< Ok(()) } + fn dkg_removal(&mut self, data: &SignData<[u8; 32]>) -> DkgRemoval<'_, T> { + DkgRemoval { + spec: self.spec, + key: self.our_key, + txn: self.txn, + removing: data.plan, + attempt: data.attempt, + } + } + pub(crate) async fn handle_application_tx(&mut self, tx: Transaction) { let genesis = self.spec.genesis(); @@ -260,14 +270,8 @@ impl< let Ok(_) = self.check_sign_data_len(signed.signer, commitments.len()).await else { return; }; - match self - .handle_data( - &DataSpecification { topic: Topic::Dkg, label: Label::Preprocess, attempt }, - commitments.encode(), - &signed, - ) - .await - { + let data_spec = DataSpecification { topic: Topic::Dkg, label: Label::Preprocess, attempt }; + match self.handle_data(&data_spec, commitments.encode(), &signed).await { Accumulation::Ready(DataSet::Participating(mut commitments)) => { log::info!("got all DkgCommitments for {}", hex::encode(genesis)); unflatten(self.spec, &mut commitments); @@ -359,14 +363,9 @@ impl< // Drop shares as it's been mutated into invalidity drop(shares); - match self - .handle_data( - &DataSpecification { topic: Topic::Dkg, label: Label::Share, attempt }, - (confirmation_nonces.to_vec(), our_shares.encode()).encode(), - &signed, - ) - .await - { + let data_spec = DataSpecification { topic: Topic::Dkg, label: Label::Share, attempt }; + let encoded_data = (confirmation_nonces.to_vec(), our_shares.encode()).encode(); + match self.handle_data(&data_spec, encoded_data, &signed).await { Accumulation::Ready(DataSet::Participating(confirmation_nonces_and_shares)) => { log::info!("got all DkgShares for {}", hex::encode(genesis)); @@ -458,14 +457,9 @@ impl< } Transaction::DkgConfirmed(attempt, shares, signed) => { - match self - .handle_data( - &DataSpecification { topic: Topic::DkgConfirmation, label: Label::Share, attempt }, - shares.to_vec(), - &signed, - ) - .await - { + let data_spec = + DataSpecification { topic: Topic::DkgConfirmation, label: Label::Share, attempt }; + match self.handle_data(&data_spec, shares.to_vec(), &signed).await { Accumulation::Ready(DataSet::Participating(shares)) => { log::info!("got all DkgConfirmed for {}", hex::encode(genesis)); @@ -514,17 +508,13 @@ impl< return; } - let Accumulation::Ready(DataSet::Participating(results)) = self - .handle_data( - &DataSpecification { - topic: Topic::DkgRemoval(data.plan), - label: data.label, - attempt: data.attempt, - }, - data.data.encode(), - &data.signed, - ) - .await + let data_spec = DataSpecification { + topic: Topic::DkgRemoval(data.plan), + label: data.label, + attempt: data.attempt, + }; + let Accumulation::Ready(DataSet::Participating(results)) = + self.handle_data(&data_spec, data.data.encode(), &data.signed).await else { return; }; @@ -533,14 +523,7 @@ impl< Label::Preprocess => { RemovalNonces::set(self.txn, genesis, data.plan, data.attempt, &results); - let Ok(share) = (DkgRemoval { - spec: self.spec, - key: self.our_key, - txn: self.txn, - removing: data.plan, - attempt: data.attempt, - }) - .share(results) else { + let Ok(share) = self.dkg_removal(&data).share(results) else { // TODO: Locally increase slash points to maximum (distinct from an explicitly fatal // slash) and censor transactions (yet don't explicitly ban) return; @@ -560,14 +543,8 @@ impl< let preprocesses = RemovalNonces::get(self.txn, genesis, data.plan, data.attempt).unwrap(); - let Ok((signers, signature)) = (DkgRemoval { - spec: self.spec, - key: self.our_key, - txn: self.txn, - removing: data.plan, - attempt: data.attempt, - }) - .complete(preprocesses, results) else { + let Ok((signers, signature)) = self.dkg_removal(&data).complete(preprocesses, results) + else { // TODO: Locally increase slash points to maximum (distinct from an explicitly fatal // slash) and censor transactions (yet don't explicitly ban) return; @@ -607,20 +584,15 @@ impl< let block_number = SeraiBlockNumber::get(self.txn, hash) .expect("CosignSubstrateBlock yet didn't save Serai block number"); - self - .processors - .send( - self.spec.set().network, - coordinator::CoordinatorMessage::CosignSubstrateBlock { - id: SubstrateSignId { - session: self.spec.set().session, - id: SubstrateSignableId::CosigningSubstrateBlock(hash), - attempt: 0, - }, - block_number, - }, - ) - .await; + let msg = coordinator::CoordinatorMessage::CosignSubstrateBlock { + id: SubstrateSignId { + session: self.spec.set().session, + id: SubstrateSignableId::CosigningSubstrateBlock(hash), + attempt: 0, + }, + block_number, + }; + self.processors.send(self.spec.set().network, msg).await; } Transaction::Batch(_, batch) => { @@ -666,61 +638,48 @@ impl< } } - if let Accumulation::Ready(DataSet::Participating(mut results)) = self - .handle_data( - &DataSpecification { - topic: Topic::SubstrateSign(data.plan), - label: data.label, - attempt: data.attempt, - }, - data.data.encode(), - &data.signed, - ) - .await - { - unflatten(self.spec, &mut results); - let id = SubstrateSignId { - session: self.spec.set().session, - id: data.plan, - attempt: data.attempt, - }; - self - .processors - .send( - self.spec.set().network, - match data.label { - Label::Preprocess => coordinator::CoordinatorMessage::SubstratePreprocesses { - id, - preprocesses: results - .into_iter() - .map(|(v, p)| (v, p.try_into().unwrap())) - .collect(), - }, - Label::Share => coordinator::CoordinatorMessage::SubstrateShares { - id, - shares: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(), - }, - }, - ) - .await; - } + let data_spec = DataSpecification { + topic: Topic::SubstrateSign(data.plan), + label: data.label, + attempt: data.attempt, + }; + let Accumulation::Ready(DataSet::Participating(mut results)) = + self.handle_data(&data_spec, data.data.encode(), &data.signed).await + else { + return; + }; + unflatten(self.spec, &mut results); + + let id = SubstrateSignId { + session: self.spec.set().session, + id: data.plan, + attempt: data.attempt, + }; + let msg = match data.label { + Label::Preprocess => coordinator::CoordinatorMessage::SubstratePreprocesses { + id, + preprocesses: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(), + }, + Label::Share => coordinator::CoordinatorMessage::SubstrateShares { + id, + shares: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(), + }, + }; + self.processors.send(self.spec.set().network, msg).await; } Transaction::Sign(data) => { let Ok(_) = self.check_sign_data_len(data.signed.signer, data.data.len()).await else { return; }; - if let Accumulation::Ready(DataSet::Participating(mut results)) = self - .handle_data( - &DataSpecification { - topic: Topic::Sign(data.plan), - label: data.label, - attempt: data.attempt, - }, - data.data.encode(), - &data.signed, - ) - .await + + let data_spec = DataSpecification { + topic: Topic::Sign(data.plan), + label: data.label, + attempt: data.attempt, + }; + if let Accumulation::Ready(DataSet::Participating(mut results)) = + self.handle_data(&data_spec, data.data.encode(), &data.signed).await { unflatten(self.spec, &mut results); let id = @@ -756,17 +715,12 @@ impl< // TODO: Confirm this signer hasn't prior published a completion - self - .processors - .send( - self.spec.set().network, - sign::CoordinatorMessage::Completed { - session: self.spec.set().session, - id: plan, - tx: tx_hash, - }, - ) - .await; + let msg = sign::CoordinatorMessage::Completed { + session: self.spec.set().session, + id: plan, + tx: tx_hash, + }; + self.processors.send(self.spec.set().network, msg).await; } } }