Skip to content

Commit

Permalink
refac: decrypted weights fn
Browse files Browse the repository at this point in the history
  • Loading branch information
Supremesource committed Nov 29, 2024
1 parent 9b6eb9f commit 7863c49
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 52 deletions.
109 changes: 58 additions & 51 deletions pallets/subnet_emission/src/decryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,66 +173,73 @@ impl<T: Config> Pallet<T> {
netuid: u16,
weights: Option<Vec<KeylessBlockWeights>>,
) -> Result<(), &'static str> {
if let Some(weights) = weights {
// Sorts from oldest weights to newest
let mut sorted_weights = weights;
sorted_weights.sort_by_key(|(block, _)| *block);

let mut accumulated_emission: u64 = 0;

for (block, weights) in sorted_weights {
let consensus_type =
SubnetConsensusType::<T>::get(netuid).ok_or("Invalid network ID")?;
if consensus_type != pallet_subnet_emission_api::SubnetConsensus::Yuma {
return Err("Unsupported consensus type");
}
let weights = match weights {
Some(w) => w,
None => {
log::info!("No decrypted weights");
return Ok(());
}
};

// Extend the weight storage of the subnet with the new weights
for (uid, weights) in weights.clone() {
Weights::<T>::set(netuid, uid, Some(weights));
}
// Ensure consensus type is Yuma
let consensus_type = SubnetConsensusType::<T>::get(netuid).ok_or("Invalid network ID")?;
if consensus_type != pallet_subnet_emission_api::SubnetConsensus::Yuma {
return Err("Unsupported consensus type");
}

let mut params = ConsensusParameters::<T>::get(netuid, block).ok_or_else(|| {
log::error!("no params found for netuid {netuid} block {block}");
"Missing consensus parameters"
})?;
// Process weights in chronological order
let mut sorted_weights = weights;
sorted_weights.sort_by_key(|(block, _)| *block);

params.token_emission = params.token_emission.saturating_add(accumulated_emission);
let new_emission = params.token_emission;
let final_emission = Self::process_weights_sequence(netuid, sorted_weights)?;

log::info!("final weights before running decrypted yuma are {weights:?}");
// Update pending emission if there's any accumulated
if final_emission > 0 {
update_pending_emission::<T>(netuid, &final_emission);
}

match YumaEpoch::new(netuid, params.clone()).run(weights) {
Ok(output) => {
accumulated_emission = 0;
log::info!("applying yuma for {netuid}");
output.apply()
}
Err(err) => {
log::error!(
"could not run yuma consensus for {netuid} block {block}: {err:?}"
);
accumulated_emission = new_emission;
}
}
}
// Cleanup subnet state (excluding decryption data)
Self::cleanup_subnet_wc_state(netuid, false, false);

Ok(())
}

// If the last consensus that we were processing had an error we directly update the
// pending emision storage of the subnet
if accumulated_emission > 0 {
update_pending_emission::<T>(netuid, &accumulated_emission);
fn process_weights_sequence(
netuid: u16,
weights: Vec<KeylessBlockWeights>,
) -> Result<u64, &'static str> {
weights.into_iter().fold(Ok(0u64), |acc_emission, (block, block_weights)| {
let acc_emission = acc_emission?;

// Update weights storage
for (uid, weights) in block_weights.clone() {
Weights::<T>::set(netuid, uid, Some(weights));
}

// --- Clear All Of the Relevant Storages ---
// We avoid subnet decryption data, as node rotation has to handle that
// Get and update consensus parameters
let mut params = ConsensusParameters::<T>::get(netuid, block).ok_or_else(|| {
log::error!("no params found for netuid {netuid} block {block}");
"Missing consensus parameters"
})?;

Self::cleanup_subnet_wc_state(netuid, false, false); // don't increase pending emisison, don't deletete node assignement
params.token_emission = params.token_emission.saturating_add(acc_emission);
let new_emission = params.token_emission;

Ok(())
} else {
log::info!("No decrypted weights");
Ok(())
}
log::info!("final weights before running decrypted yuma are {block_weights:?}");

// Run Yuma consensus
match YumaEpoch::new(netuid, params).run(block_weights) {
Ok(output) => {
log::info!("applying yuma for {netuid}");
output.apply();
Ok(0) // Reset accumulated emission on success
}
Err(err) => {
log::error!("could not run yuma consensus for {netuid} block {block}: {err:?}");
Ok(new_emission) // Keep accumulating on error
}
}
})

Check failure on line 242 in pallets/subnet_emission/src/decryption.rs

View workflow job for this annotation

GitHub Actions / clippy

usage of `Iterator::fold` on a type that implements `Try`

error: usage of `Iterator::fold` on a type that implements `Try` --> pallets/subnet_emission/src/decryption.rs:211:29 | 211 | weights.into_iter().fold(Ok(0u64), |acc_emission, (block, block_weights)| { | _____________________________^ 212 | | let acc_emission = acc_emission?; 213 | | 214 | | // Update weights storage ... | 241 | | } 242 | | }) | |__________^ help: use `try_fold` instead: `try_fold(0u64, |acc_emission, (block, block_weights)| ...)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_try_fold = note: `-D clippy::manual-try-fold` implied by `-D clippy::all` = help: to override `-D clippy::all` add `#[allow(clippy::manual_try_fold)]`
}

fn weights_to_blob(weights: &[(u16, u16)]) -> Vec<u8> {
Expand Down Expand Up @@ -543,6 +550,7 @@ impl<T: Config> Pallet<T> {
increase_pending_emission: bool,
clear_node_assing: bool,
) -> u64 {
// TODO: take this out of the function enirely
// Sum up and clear ConsensusParameters
let total_emission = ConsensusParameters::<T>::iter_prefix(subnet_id)
.fold(0u64, |acc, (_, params)| {
Expand Down Expand Up @@ -635,7 +643,6 @@ impl<T: Config> Pallet<T> {
}
}

/// TODO: delete the wc state
pub(crate) fn rotate_decryption_node_if_needed(subnet_id: u16, info: SubnetDecryptionInfo<T>) {
let block_number = pallet_subspace::Pallet::<T>::get_current_block_number();
let validity_block = match info.validity_block {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub fn calculate_final_emissions<T: Config>(

validator_emission = validator_emission
.checked_sub(to_delegate)
.ok_or("more validator emissions were done than expected")?;
.ok_or("more validator emissions were processed than expected")?;
}
}

Expand Down Expand Up @@ -804,10 +804,12 @@ pub struct ConsensusOutput<T: Config> {
pub total_emitted: u64,
}

// TODO: write a test on this
impl<T: Config> ConsensusOutput<T> {
pub fn apply(self) {
use pallet_subspace::*;

#[deny(unused_variables)]
let Self {
subnet_id,
active,
Expand Down Expand Up @@ -852,6 +854,7 @@ impl<T: Config> ConsensusOutput<T> {
self.founder_emission,
);

// especially make sure this is correct, hasn't been tested yet
for (module_key, emitted_to) in self.emission_map {
for (account_key, emission) in emitted_to {
if PalletSubspace::<T>::is_registered(Some(subnet_id), &account_key.0) {
Expand Down

0 comments on commit 7863c49

Please sign in to comment.