diff --git a/pallets/subnet_emission/src/decryption.rs b/pallets/subnet_emission/src/decryption.rs index aa56228d..464625ff 100644 --- a/pallets/subnet_emission/src/decryption.rs +++ b/pallets/subnet_emission/src/decryption.rs @@ -173,66 +173,73 @@ impl Pallet { netuid: u16, weights: Option>, ) -> Result<(), &'static str> { - if let Some(weights) = weights { - // Sorts from oldest weights to newest - let mut sorted_weights = weights; - sorted_weights.sort_by_key(|(block, _)| *block); - - let mut accumulated_emission: u64 = 0; - - for (block, weights) in sorted_weights { - let consensus_type = - SubnetConsensusType::::get(netuid).ok_or("Invalid network ID")?; - if consensus_type != pallet_subnet_emission_api::SubnetConsensus::Yuma { - return Err("Unsupported consensus type"); - } + let weights = match weights { + Some(w) => w, + None => { + log::info!("No decrypted weights"); + return Ok(()); + } + }; - // Extend the weight storage of the subnet with the new weights - for (uid, weights) in weights.clone() { - Weights::::set(netuid, uid, Some(weights)); - } + // Ensure consensus type is Yuma + let consensus_type = SubnetConsensusType::::get(netuid).ok_or("Invalid network ID")?; + if consensus_type != pallet_subnet_emission_api::SubnetConsensus::Yuma { + return Err("Unsupported consensus type"); + } - let mut params = ConsensusParameters::::get(netuid, block).ok_or_else(|| { - log::error!("no params found for netuid {netuid} block {block}"); - "Missing consensus parameters" - })?; + // Process weights in chronological order + let mut sorted_weights = weights; + sorted_weights.sort_by_key(|(block, _)| *block); - params.token_emission = params.token_emission.saturating_add(accumulated_emission); - let new_emission = params.token_emission; + let final_emission = Self::process_weights_sequence(netuid, sorted_weights)?; - log::info!("final weights before running decrypted yuma are {weights:?}"); + // Update pending emission if there's any accumulated + if final_emission > 0 { + update_pending_emission::(netuid, &final_emission); + } - match YumaEpoch::new(netuid, params.clone()).run(weights) { - Ok(output) => { - accumulated_emission = 0; - log::info!("applying yuma for {netuid}"); - output.apply() - } - Err(err) => { - log::error!( - "could not run yuma consensus for {netuid} block {block}: {err:?}" - ); - accumulated_emission = new_emission; - } - } - } + // Cleanup subnet state (excluding decryption data) + Self::cleanup_subnet_wc_state(netuid, false, false); + + Ok(()) + } - // If the last consensus that we were processing had an error we directly update the - // pending emision storage of the subnet - if accumulated_emission > 0 { - update_pending_emission::(netuid, &accumulated_emission); + fn process_weights_sequence( + netuid: u16, + weights: Vec, + ) -> Result { + weights.into_iter().fold(Ok(0u64), |acc_emission, (block, block_weights)| { + let acc_emission = acc_emission?; + + // Update weights storage + for (uid, weights) in block_weights.clone() { + Weights::::set(netuid, uid, Some(weights)); } - // --- Clear All Of the Relevant Storages --- - // We avoid subnet decryption data, as node rotation has to handle that + // Get and update consensus parameters + let mut params = ConsensusParameters::::get(netuid, block).ok_or_else(|| { + log::error!("no params found for netuid {netuid} block {block}"); + "Missing consensus parameters" + })?; - Self::cleanup_subnet_wc_state(netuid, false, false); // don't increase pending emisison, don't deletete node assignement + params.token_emission = params.token_emission.saturating_add(acc_emission); + let new_emission = params.token_emission; - Ok(()) - } else { - log::info!("No decrypted weights"); - Ok(()) - } + log::info!("final weights before running decrypted yuma are {block_weights:?}"); + + // Run Yuma consensus + match YumaEpoch::new(netuid, params).run(block_weights) { + Ok(output) => { + log::info!("applying yuma for {netuid}"); + output.apply(); + Ok(0) // Reset accumulated emission on success + } + Err(err) => { + log::error!("could not run yuma consensus for {netuid} block {block}: {err:?}"); + Ok(new_emission) // Keep accumulating on error + } + } + }) } fn weights_to_blob(weights: &[(u16, u16)]) -> Vec { @@ -543,6 +550,7 @@ impl Pallet { increase_pending_emission: bool, clear_node_assing: bool, ) -> u64 { + // TODO: take this out of the function enirely // Sum up and clear ConsensusParameters let total_emission = ConsensusParameters::::iter_prefix(subnet_id) .fold(0u64, |acc, (_, params)| { @@ -635,7 +643,6 @@ impl Pallet { } } - /// TODO: delete the wc state pub(crate) fn rotate_decryption_node_if_needed(subnet_id: u16, info: SubnetDecryptionInfo) { let block_number = pallet_subspace::Pallet::::get_current_block_number(); let validity_block = match info.validity_block { diff --git a/pallets/subnet_emission/src/subnet_consensus/util/consensus.rs b/pallets/subnet_emission/src/subnet_consensus/util/consensus.rs index 84bffa8c..e3ca1030 100644 --- a/pallets/subnet_emission/src/subnet_consensus/util/consensus.rs +++ b/pallets/subnet_emission/src/subnet_consensus/util/consensus.rs @@ -144,7 +144,7 @@ pub fn calculate_final_emissions( validator_emission = validator_emission .checked_sub(to_delegate) - .ok_or("more validator emissions were done than expected")?; + .ok_or("more validator emissions were processed than expected")?; } } @@ -804,10 +804,12 @@ pub struct ConsensusOutput { pub total_emitted: u64, } +// TODO: write a test on this impl ConsensusOutput { pub fn apply(self) { use pallet_subspace::*; + #[deny(unused_variables)] let Self { subnet_id, active, @@ -852,6 +854,7 @@ impl ConsensusOutput { self.founder_emission, ); + // especially make sure this is correct, hasn't been tested yet for (module_key, emitted_to) in self.emission_map { for (account_key, emission) in emitted_to { if PalletSubspace::::is_registered(Some(subnet_id), &account_key.0) {