Skip to content

Commit

Permalink
Use temporary variables to further minimize LoC in tributary handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Dec 10, 2023
1 parent 70284ed commit 24f4dae
Showing 1 changed file with 79 additions and 125 deletions.
204 changes: 79 additions & 125 deletions coordinator/src/tributary/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ impl<
Ok(())
}

fn dkg_removal(&mut self, data: &SignData<[u8; 32]>) -> DkgRemoval<'_, T> {
DkgRemoval {
spec: self.spec,
key: self.our_key,
txn: self.txn,
removing: data.plan,
attempt: data.attempt,
}
}

pub(crate) async fn handle_application_tx(&mut self, tx: Transaction) {
let genesis = self.spec.genesis();

Expand All @@ -260,14 +270,8 @@ impl<
let Ok(_) = self.check_sign_data_len(signed.signer, commitments.len()).await else {
return;
};
match self
.handle_data(
&DataSpecification { topic: Topic::Dkg, label: Label::Preprocess, attempt },
commitments.encode(),
&signed,
)
.await
{
let data_spec = DataSpecification { topic: Topic::Dkg, label: Label::Preprocess, attempt };
match self.handle_data(&data_spec, commitments.encode(), &signed).await {
Accumulation::Ready(DataSet::Participating(mut commitments)) => {
log::info!("got all DkgCommitments for {}", hex::encode(genesis));
unflatten(self.spec, &mut commitments);
Expand Down Expand Up @@ -359,14 +363,9 @@ impl<
// Drop shares as it's been mutated into invalidity
drop(shares);

match self
.handle_data(
&DataSpecification { topic: Topic::Dkg, label: Label::Share, attempt },
(confirmation_nonces.to_vec(), our_shares.encode()).encode(),
&signed,
)
.await
{
let data_spec = DataSpecification { topic: Topic::Dkg, label: Label::Share, attempt };
let encoded_data = (confirmation_nonces.to_vec(), our_shares.encode()).encode();
match self.handle_data(&data_spec, encoded_data, &signed).await {
Accumulation::Ready(DataSet::Participating(confirmation_nonces_and_shares)) => {
log::info!("got all DkgShares for {}", hex::encode(genesis));

Expand Down Expand Up @@ -458,14 +457,9 @@ impl<
}

Transaction::DkgConfirmed(attempt, shares, signed) => {
match self
.handle_data(
&DataSpecification { topic: Topic::DkgConfirmation, label: Label::Share, attempt },
shares.to_vec(),
&signed,
)
.await
{
let data_spec =
DataSpecification { topic: Topic::DkgConfirmation, label: Label::Share, attempt };
match self.handle_data(&data_spec, shares.to_vec(), &signed).await {
Accumulation::Ready(DataSet::Participating(shares)) => {
log::info!("got all DkgConfirmed for {}", hex::encode(genesis));

Expand Down Expand Up @@ -514,17 +508,13 @@ impl<
return;
}

let Accumulation::Ready(DataSet::Participating(results)) = self
.handle_data(
&DataSpecification {
topic: Topic::DkgRemoval(data.plan),
label: data.label,
attempt: data.attempt,
},
data.data.encode(),
&data.signed,
)
.await
let data_spec = DataSpecification {
topic: Topic::DkgRemoval(data.plan),
label: data.label,
attempt: data.attempt,
};
let Accumulation::Ready(DataSet::Participating(results)) =
self.handle_data(&data_spec, data.data.encode(), &data.signed).await
else {
return;
};
Expand All @@ -533,14 +523,7 @@ impl<
Label::Preprocess => {
RemovalNonces::set(self.txn, genesis, data.plan, data.attempt, &results);

let Ok(share) = (DkgRemoval {
spec: self.spec,
key: self.our_key,
txn: self.txn,
removing: data.plan,
attempt: data.attempt,
})
.share(results) else {
let Ok(share) = self.dkg_removal(&data).share(results) else {
// TODO: Locally increase slash points to maximum (distinct from an explicitly fatal
// slash) and censor transactions (yet don't explicitly ban)
return;
Expand All @@ -560,14 +543,8 @@ impl<
let preprocesses =
RemovalNonces::get(self.txn, genesis, data.plan, data.attempt).unwrap();

let Ok((signers, signature)) = (DkgRemoval {
spec: self.spec,
key: self.our_key,
txn: self.txn,
removing: data.plan,
attempt: data.attempt,
})
.complete(preprocesses, results) else {
let Ok((signers, signature)) = self.dkg_removal(&data).complete(preprocesses, results)
else {
// TODO: Locally increase slash points to maximum (distinct from an explicitly fatal
// slash) and censor transactions (yet don't explicitly ban)
return;
Expand Down Expand Up @@ -607,20 +584,15 @@ impl<

let block_number = SeraiBlockNumber::get(self.txn, hash)
.expect("CosignSubstrateBlock yet didn't save Serai block number");
self
.processors
.send(
self.spec.set().network,
coordinator::CoordinatorMessage::CosignSubstrateBlock {
id: SubstrateSignId {
session: self.spec.set().session,
id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0,
},
block_number,
},
)
.await;
let msg = coordinator::CoordinatorMessage::CosignSubstrateBlock {
id: SubstrateSignId {
session: self.spec.set().session,
id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0,
},
block_number,
};
self.processors.send(self.spec.set().network, msg).await;
}

Transaction::Batch(_, batch) => {
Expand Down Expand Up @@ -666,61 +638,48 @@ impl<
}
}

if let Accumulation::Ready(DataSet::Participating(mut results)) = self
.handle_data(
&DataSpecification {
topic: Topic::SubstrateSign(data.plan),
label: data.label,
attempt: data.attempt,
},
data.data.encode(),
&data.signed,
)
.await
{
unflatten(self.spec, &mut results);
let id = SubstrateSignId {
session: self.spec.set().session,
id: data.plan,
attempt: data.attempt,
};
self
.processors
.send(
self.spec.set().network,
match data.label {
Label::Preprocess => coordinator::CoordinatorMessage::SubstratePreprocesses {
id,
preprocesses: results
.into_iter()
.map(|(v, p)| (v, p.try_into().unwrap()))
.collect(),
},
Label::Share => coordinator::CoordinatorMessage::SubstrateShares {
id,
shares: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(),
},
},
)
.await;
}
let data_spec = DataSpecification {
topic: Topic::SubstrateSign(data.plan),
label: data.label,
attempt: data.attempt,
};
let Accumulation::Ready(DataSet::Participating(mut results)) =
self.handle_data(&data_spec, data.data.encode(), &data.signed).await
else {
return;
};
unflatten(self.spec, &mut results);

let id = SubstrateSignId {
session: self.spec.set().session,
id: data.plan,
attempt: data.attempt,
};
let msg = match data.label {
Label::Preprocess => coordinator::CoordinatorMessage::SubstratePreprocesses {
id,
preprocesses: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(),
},
Label::Share => coordinator::CoordinatorMessage::SubstrateShares {
id,
shares: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(),
},
};
self.processors.send(self.spec.set().network, msg).await;
}

Transaction::Sign(data) => {
let Ok(_) = self.check_sign_data_len(data.signed.signer, data.data.len()).await else {
return;
};
if let Accumulation::Ready(DataSet::Participating(mut results)) = self
.handle_data(
&DataSpecification {
topic: Topic::Sign(data.plan),
label: data.label,
attempt: data.attempt,
},
data.data.encode(),
&data.signed,
)
.await

let data_spec = DataSpecification {
topic: Topic::Sign(data.plan),
label: data.label,
attempt: data.attempt,
};
if let Accumulation::Ready(DataSet::Participating(mut results)) =
self.handle_data(&data_spec, data.data.encode(), &data.signed).await
{
unflatten(self.spec, &mut results);
let id =
Expand Down Expand Up @@ -756,17 +715,12 @@ impl<

// TODO: Confirm this signer hasn't prior published a completion

self
.processors
.send(
self.spec.set().network,
sign::CoordinatorMessage::Completed {
session: self.spec.set().session,
id: plan,
tx: tx_hash,
},
)
.await;
let msg = sign::CoordinatorMessage::Completed {
session: self.spec.set().session,
id: plan,
tx: tx_hash,
};
self.processors.send(self.spec.set().network, msg).await;
}
}
}
Expand Down

0 comments on commit 24f4dae

Please sign in to comment.