From d590dd037eab6e56b62f3884239cfb9afe078184 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:51:49 -0800 Subject: [PATCH] [consensus] Exclude low scoring ancestors in proposals with time based inclusion/exclusion (#19605) ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/config/src/committee.rs | 5 + consensus/core/src/ancestor.rs | 385 ++++++++++++++ consensus/core/src/core.rs | 482 +++++++++++++++++- consensus/core/src/dag_state.rs | 2 +- consensus/core/src/leader_scoring.rs | 83 +-- consensus/core/src/lib.rs | 1 + consensus/core/src/metrics.rs | 27 + consensus/core/src/round_prober.rs | 2 +- crates/sui-open-rpc/spec/openrpc.json | 1 + crates/sui-protocol-config/src/lib.rs | 14 + ...sui_protocol_config__test__version_69.snap | 1 + 11 files changed, 916 insertions(+), 87 deletions(-) create mode 100644 consensus/core/src/ancestor.rs diff --git a/consensus/config/src/committee.rs b/consensus/config/src/committee.rs index 0d941a3aa39fd..2dfb5ff66bb87 100644 --- a/consensus/config/src/committee.rs +++ b/consensus/config/src/committee.rs @@ -68,6 +68,11 @@ impl Committee { self.total_stake } + pub fn n_percent_stake_threshold(&self, n: u64) -> Stake { + assert!(n <= 100, "n must be between 0 and 100"); + self.total_stake * n / 100 + } + pub fn quorum_threshold(&self) -> Stake { self.quorum_threshold } diff --git a/consensus/core/src/ancestor.rs b/consensus/core/src/ancestor.rs new file mode 100644 index 0000000000000..6ad49ac190e39 --- /dev/null +++ b/consensus/core/src/ancestor.rs @@ -0,0 +1,385 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use consensus_config::AuthorityIndex; +use tracing::info; + +use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound}; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum AncestorState { + Include, + Exclude(u64), +} + +#[derive(Clone)] +struct AncestorInfo { + state: AncestorState, + // This will be set to the count of either the quorum round update count or + // the score update count for which the EXCLUDE or INCLUDE state are locked + // in respectively. + lock_expiry_count: u32, +} + +impl AncestorInfo { + fn new() -> Self { + Self { + state: AncestorState::Include, + lock_expiry_count: 0, + } + } + + fn is_locked( + &self, + propagation_score_update_count: u32, + quorum_round_update_count: u32, + ) -> bool { + match self.state { + AncestorState::Include => self.lock_expiry_count > propagation_score_update_count, + AncestorState::Exclude(_) => self.lock_expiry_count > quorum_round_update_count, + } + } + + fn set_lock(&mut self, future_count: u32) { + self.lock_expiry_count = future_count; + } +} + +pub(crate) struct AncestorStateManager { + context: Arc, + state_map: Vec, + propagation_score_update_count: u32, + quorum_round_update_count: u32, + pub(crate) quorum_round_per_authority: Vec, + // This is the reputation scores that we use for leader election but we are + // using it here as a signal for high quality block propagation as well. + pub(crate) propagation_scores: ReputationScores, +} + +impl AncestorStateManager { + // Number of quorum round updates for which an ancestor is locked in the EXCLUDE state + // Chose 10 updates as that should be ~50 seconds of waiting with the current round prober + // interval of 5s + #[cfg(not(test))] + const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 10; + #[cfg(test)] + const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 1; + + // Number of propagation score updates for which an ancestor is locked in the INCLUDE state + // Chose 2 leader schedule updates (~300 commits per schedule) which should be ~30-90 seconds + // depending on the round rate for the authority to improve scores. + #[cfg(not(test))] + const STATE_LOCK_SCORE_UPDATES: u32 = 2; + #[cfg(test)] + const STATE_LOCK_SCORE_UPDATES: u32 = 1; + + // Exclusion threshold is based on propagation (reputation) scores + const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10; + + // Inclusion threshold is based on network quorum round + const INCLUSION_THRESHOLD_PERCENTAGE: u64 = 90; + + pub(crate) fn new(context: Arc) -> Self { + let state_map = vec![AncestorInfo::new(); context.committee.size()]; + + let quorum_round_per_authority = vec![(0, 0); context.committee.size()]; + Self { + context, + state_map, + propagation_score_update_count: 0, + quorum_round_update_count: 0, + propagation_scores: ReputationScores::default(), + quorum_round_per_authority, + } + } + + pub(crate) fn set_quorum_round_per_authority(&mut self, quorum_rounds: Vec) { + self.quorum_round_per_authority = quorum_rounds; + self.quorum_round_update_count += 1; + } + + pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) { + self.propagation_scores = scores; + self.propagation_score_update_count += 1; + } + + pub(crate) fn get_ancestor_states(&self) -> Vec { + self.state_map.iter().map(|info| info.state).collect() + } + + /// Updates the state of all ancestors based on the latest scores and quorum rounds + pub(crate) fn update_all_ancestors_state(&mut self) { + let propagation_scores_by_authority = self + .propagation_scores + .scores_per_authority + .clone() + .into_iter() + .enumerate() + .map(|(idx, score)| { + ( + self.context + .committee + .to_authority_index(idx) + .expect("Index should be valid"), + score, + ) + }) + .collect::>(); + + // If round prober has not run yet and we don't have network quorum round, + // it is okay because network_low_quorum_round will be zero and we will + // include all ancestors until we get more information. + let network_low_quorum_round = self.calculate_network_low_quorum_round(); + + // If propagation scores are not ready because the first 300 commits have not + // happened, this is okay as we will only start excluding ancestors after that + // point in time. + for (authority_id, score) in propagation_scores_by_authority { + let (authority_low_quorum_round, _high) = self.quorum_round_per_authority[authority_id]; + + self.update_state( + authority_id, + score, + authority_low_quorum_round, + network_low_quorum_round, + ); + } + } + + /// Updates the state of the given authority based on current scores and quorum rounds. + fn update_state( + &mut self, + authority_id: AuthorityIndex, + propagation_score: u64, + authority_low_quorum_round: u32, + network_low_quorum_round: u32, + ) { + let block_hostname = &self.context.committee.authority(authority_id).hostname; + let mut ancestor_info = self.state_map[authority_id].clone(); + + if ancestor_info.is_locked( + self.propagation_score_update_count, + self.quorum_round_update_count, + ) { + // If still locked, we won't make any state changes. + return; + } + + let low_score_threshold = + (self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100; + + match ancestor_info.state { + // Check conditions to switch to EXCLUDE state + AncestorState::Include => { + if propagation_score <= low_score_threshold { + ancestor_info.state = AncestorState::Exclude(propagation_score); + ancestor_info.set_lock( + self.quorum_round_update_count + Self::STATE_LOCK_QUORUM_ROUND_UPDATES, + ); + info!( + "Authority {authority_id} moved to EXCLUDE state with score {propagation_score} <= threshold of {low_score_threshold} and locked for {:?} quorum round updates", + Self::STATE_LOCK_QUORUM_ROUND_UPDATES + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "exclude"]) + .inc(); + } + } + // Check conditions to switch back to INCLUDE state + AncestorState::Exclude(_) => { + // It should not be possible for the scores to get over the threshold + // until the node is back in the INCLUDE state, but adding just in case. + if propagation_score > low_score_threshold + || authority_low_quorum_round >= network_low_quorum_round + { + ancestor_info.state = AncestorState::Include; + ancestor_info.set_lock( + self.propagation_score_update_count + Self::STATE_LOCK_SCORE_UPDATES, + ); + info!( + "Authority {authority_id} moved to INCLUDE state with {propagation_score} > threshold of {low_score_threshold} or {authority_low_quorum_round} >= {network_low_quorum_round} and locked for {:?} score updates.", + Self::STATE_LOCK_SCORE_UPDATES + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "include"]) + .inc(); + } + } + } + + // If any updates were made to state ensure they are persisted. + self.state_map[authority_id] = ancestor_info; + } + + /// Calculate the network's quorum round from authorities by inclusion stake + /// threshold, where quorum round is the highest round a block has been seen + /// by a percentage (inclusion threshold) of authorities. + /// TODO: experiment with using high quorum round + fn calculate_network_low_quorum_round(&self) -> u32 { + let committee = &self.context.committee; + let inclusion_stake_threshold = self.get_inclusion_stake_threshold(); + let mut low_quorum_rounds_with_stake = self + .quorum_round_per_authority + .iter() + .zip(committee.authorities()) + .map(|((low, _high), (_, authority))| (*low, authority.stake)) + .collect::>(); + low_quorum_rounds_with_stake.sort(); + + let mut total_stake = 0; + let mut network_low_quorum_round = 0; + + for (round, stake) in low_quorum_rounds_with_stake.iter().rev() { + total_stake += stake; + if total_stake >= inclusion_stake_threshold { + network_low_quorum_round = *round; + break; + } + } + + network_low_quorum_round + } + + fn get_inclusion_stake_threshold(&self) -> u64 { + self.context + .committee + .n_percent_stake_threshold(Self::INCLUSION_THRESHOLD_PERCENTAGE) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::leader_scoring::ReputationScores; + + #[tokio::test] + async fn test_calculate_network_low_quorum_round() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context); + ancestor_state_manager.set_propagation_scores(scores); + + // Quorum rounds are not set yet, so we should calculate a network quorum + // round of 0 to start. + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 0); + + let quorum_rounds = vec![(100, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 225); + } + + // Test all state transitions + // Default all INCLUDE -> EXCLUDE + // EXCLUDE -> INCLUDE (Blocked due to lock) + // EXCLUDE -> INCLUDE (Pass due to lock expired) + // INCLUDE -> EXCLUDE (Blocked due to lock) + // INCLUDE -> EXCLUDE (Pass due to lock expired) + #[tokio::test] + async fn test_update_all_ancestor_state() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context); + ancestor_state_manager.set_propagation_scores(scores); + + let quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (4 * 10) / 100 = 0 + // No ancestors should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + for state in state_map.iter() { + assert_eq!(*state, AncestorState::Include); + } + + let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (100 * 10) / 100 = 10 + // 2 authorities should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if (0..=1).contains(&authority) { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + ancestor_state_manager.update_all_ancestors_state(); + + // 2 authorities should still be excluded with these scores and no new + // quorum round updates have been set to expire the locks. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if (0..=1).contains(&authority) { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + // Updating the quorum rounds will expire the lock as we only need 1 + // quorum round update for tests. + let quorum_rounds = vec![(229, 300), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Authority 0 should now be included again because low quorum round is + // at the network low quorum round of 229. Authority 1's quorum round is + // too low and will remain excluded. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if authority == 1 { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + + let quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 can transtion to the INCLUDE state. Ancestor 0 is still locked + // in the INCLUDE state until a score update is performed which is why + // even though the scores are still low it has not moved to the EXCLUDE + // state. + let state_map = ancestor_state_manager.get_ancestor_states(); + for state in state_map.iter() { + assert_eq!(*state, AncestorState::Include); + } + + // Updating the scores will expire the lock as we only need 1 update for tests. + let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 can transition to EXCLUDE state now that the lock expired + // and its scores are below the threshold. + let state_map = ancestor_state_manager.get_ancestor_states(); + for (authority, state) in state_map.iter().enumerate() { + if authority == 1 { + assert_eq!(*state, AncestorState::Exclude(10)); + } else { + assert_eq!(*state, AncestorState::Include); + } + } + } +} diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index ee2e914e4659a..f1fb157f62e14 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -16,9 +16,10 @@ use tokio::{ sync::{broadcast, watch}, time::Instant, }; -use tracing::{debug, info, warn}; +use tracing::{debug, info, trace, warn}; use crate::{ + ancestor::{AncestorState, AncestorStateManager}, block::{ Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, VerifiedBlock, GENESIS_ROUND, @@ -96,6 +97,11 @@ pub(crate) struct Core { /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that /// the last block sync mechanism is enabled, but it hasn't been initialised yet. last_known_proposed_round: Option, + // The ancestor state manager will keep track of the quality of the authorities + // based on the distribution of their blocks to the network. It will use this + // information to decide whether to include that authority block in the next + // proposal or not. + ancestor_state_manager: AncestorStateManager, } impl Core { @@ -150,6 +156,14 @@ impl Core { Some(0) }; + let propagation_scores = leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + let mut ancestor_state_manager = AncestorStateManager::new(context.clone()); + ancestor_state_manager.set_propagation_scores(propagation_scores); + Self { context: context.clone(), threshold_clock: ThresholdClock::new(0, context.clone()), @@ -166,6 +180,7 @@ impl Core { block_signer, dag_state, last_known_proposed_round: min_propose_round, + ancestor_state_manager, } .recover() } @@ -375,6 +390,37 @@ impl Core { } } + // Determine the ancestors to be included in proposal. + // Smart ancestor selection requires distributed scoring to be enabled. + let ancestors = if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + && self + .context + .protocol_config + .consensus_smart_ancestor_selection() + { + let ancestors = self.smart_ancestors_to_propose(clock_round, !force); + + // If we did not find enough good ancestors to propose, continue to wait before proposing. + if ancestors.is_empty() { + assert!( + !force, + "Ancestors should have been returned if force is true!" + ); + return None; + } + ancestors + } else { + self.ancestors_to_propose(clock_round) + }; + + // Update the last included ancestor block refs + for ancestor in &ancestors { + self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); + } + let leader_authority = &self .context .committee @@ -397,14 +443,6 @@ impl Core { .with_label_values(&[leader_authority]) .inc(); - // TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex - // because we synchronized a bulk of blocks) we can decide here whether we want to produce blocks per round - // or just the latest one. From earlier experiments I saw only benefit on proposing for the penultimate round - // only when the validator was supposed to be the leader of the round - so we bring down the missed leaders. - // Probably proposing for all the intermediate rounds might not make much sense. - - // Determine the ancestors to be included in proposal - let ancestors = self.ancestors_to_propose(clock_round); self.context .metrics .node_metrics @@ -498,7 +536,7 @@ impl Core { // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); - info!("Created block {:?}", verified_block); + info!("Created block {verified_block:?} for round {clock_round}"); self.context .metrics @@ -542,6 +580,15 @@ impl Core { { self.leader_schedule .update_leader_schedule_v2(&self.dag_state); + + let propagation_scores = self + .leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + self.ancestor_state_manager + .set_propagation_scores(propagation_scores); } else { self.leader_schedule .update_leader_schedule_v1(&self.dag_state); @@ -633,13 +680,16 @@ impl Core { self.subscriber_exists = exists; } - /// Sets the delay by round for propagating blocks to a quorum. - // TODO: Will set the quorum round per authority in ancestor state manager. + /// Sets the delay by round for propagating blocks to a quorum and the + /// quorum round per authority for ancestor state manager. pub(crate) fn set_propagation_delay_and_quorum_rounds( &mut self, delay: Round, - _quorum_rounds: Vec, + quorum_rounds: Vec, ) { + info!("Quorum round per authority in ancestor state manager set to: {quorum_rounds:?}"); + self.ancestor_state_manager + .set_quorum_round_per_authority(quorum_rounds); info!("Propagation round delay set to: {delay}"); self.propagation_delay = delay; } @@ -747,11 +797,6 @@ impl Core { ) .collect::>(); - // Update the last included ancestor block refs - for ancestor in &ancestors { - self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); - } - // TODO: this is for temporary sanity check - we might want to remove later on let mut quorum = StakeAggregator::::new(); for ancestor in ancestors @@ -765,6 +810,142 @@ impl Core { ancestors } + /// Retrieves the next ancestors to propose to form a block at `clock_round` round. + /// If smart selection is enabled then this will try to select the best ancestors + /// based on the propagation scores of the authorities. + fn smart_ancestors_to_propose( + &mut self, + clock_round: Round, + smart_select: bool, + ) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["Core::smart_ancestors_to_propose"]) + .start_timer(); + + // Now take the ancestors before the clock_round (excluded) for each authority. + let ancestors = self + .dag_state + .read() + .get_last_cached_block_per_authority(clock_round); + + assert_eq!( + ancestors.len(), + self.context.committee.size(), + "Fatal error, number of returned ancestors don't match committee size." + ); + + // Ensure ancestor state is up to date before selecting for proposal. + self.ancestor_state_manager.update_all_ancestors_state(); + let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states(); + + let quorum_round = clock_round.saturating_sub(1); + + let mut temp_excluded_ancestors = Vec::new(); + + // Propose only ancestors of higher rounds than what has already been proposed. + // And always include own last proposed block first among ancestors. + // Start by only including the high scoring ancestors. Low scoring ancestors + // will be included in a second pass below. + let included_ancestors = iter::once(self.last_proposed_block().clone()) + .chain( + ancestors + .into_iter() + .filter(|ancestor| ancestor.author() != self.context.own_index) + .flat_map(|ancestor| { + + let ancestor_state = ancestor_state_map[ancestor.author()]; + + match ancestor_state { + AncestorState::Include => { + trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); + } + AncestorState::Exclude(score) => { + trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); + temp_excluded_ancestors.push((score, ancestor)); + return None; + } + } + + if let Some(last_block_ref) = + self.last_included_ancestors[ancestor.author()] + { + return (last_block_ref.round < ancestor.round()).then_some(ancestor); + } + Some(ancestor) + }), + ) + .collect::>(); + + let mut parent_round_quorum = StakeAggregator::::new(); + + // Check total stake of high scoring parent round ancestors + for ancestor in included_ancestors + .iter() + .filter(|a| a.round() == quorum_round) + { + parent_round_quorum.add(ancestor.author(), &self.context.committee); + } + + if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { + self.context.metrics.node_metrics.smart_selection_wait.inc(); + debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); + return vec![]; + } + + // Sort scores descending so we can include the best of the temp excluded + // ancestors first until we reach the threshold. + temp_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); + + let mut ancestors_to_propose = included_ancestors; + let mut excluded_ancestors = Vec::new(); + + for (score, ancestor) in temp_excluded_ancestors.into_iter() { + let block_hostname = &self.context.committee.authority(ancestor.author()).hostname; + if !parent_round_quorum.reached_threshold(&self.context.committee) + && ancestor.round() == quorum_round + { + debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + parent_round_quorum.add(ancestor.author(), &self.context.committee); + ancestors_to_propose.push(ancestor); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "strong"]) + .inc(); + } else { + excluded_ancestors.push((score, ancestor)); + } + } + + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); + + for (score, ancestor) in excluded_ancestors.iter() { + let excluded_author = ancestor.author(); + let block_hostname = &self.context.committee.authority(excluded_author).hostname; + + trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + } + + info!( + "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + ancestors_to_propose.len(), + excluded_ancestors.len() + ); + + ancestors_to_propose + } + /// Checks whether all the leaders of the round exist. /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout /// Ex if we already have one leader - the first in order - we might don't want to wait as much. @@ -1610,6 +1791,269 @@ mod test { } } + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() { + telemetry_subscribers::init_for_testing(); + + // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time + // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each + // Core's clock will have initialised potentially with different values but it never advances. + // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time + // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the + // tokio clock. + async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) { + // Simulate the time wait before processing a block to ensure that block.timestamp <= now + let now = context.clock.timestamp_utc_ms(); + let max_timestamp = blocks + .iter() + .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs) + .map(|block| block.timestamp_ms()) + .unwrap_or(0); + + let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now)); + sleep(wait_time).await; + } + + let (context, _) = Context::new_for_test(4); + + // Create the cores for all authorities + let mut all_cores = create_cores(context, vec![1, 1, 1, 1]); + let (_last_core, cores) = all_cores.split_last_mut().unwrap(); + + // Create blocks for rounds 1..=30 from all Cores except last Core of authority 3. + let mut last_round_blocks = Vec::::new(); + for round in 1..=30 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + assert_eq!(core_fixture.core.last_proposed_round(), round); + + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + } + + last_round_blocks = this_round_blocks; + } + + // Now produce blocks for all Cores + for round in 31..=40 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in all_cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); + + for block in this_round_blocks.iter() { + if block.author() != AuthorityIndex::new_for_test(3) { + // Assert blocks created include only 3 ancestors per block as one + // should be excluded + assert_eq!(block.ancestors().len(), 3); + } else { + // Authority 3 is the low scoring authority so it will still include + // its own blocks. + assert_eq!(block.ancestors().len(), 4); + } + } + } + + last_round_blocks = this_round_blocks; + } + } + + #[tokio::test] + async fn test_smart_ancestor_selection() { + telemetry_subscribers::init_for_testing(); + let (context, mut key_pairs) = Context::new_for_test(7); + let context = Arc::new(context.with_parameters(Parameters { + sync_last_known_own_block_timeout: Duration::from_millis(2_000), + ..Default::default() + })); + + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + let block_manager = BlockManager::new( + context.clone(), + dag_state.clone(), + Arc::new(NoopBlockVerifier), + ); + let leader_schedule = Arc::new( + LeaderSchedule::from_store(context.clone(), dag_state.clone()) + .with_num_commits_per_schedule(10), + ); + + let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); + let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone()); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); + // Need at least one subscriber to the block broadcast channel. + let _block_receiver = signal_receivers.block_broadcast_receiver(); + + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); + let commit_observer = CommitObserver::new( + context.clone(), + commit_consumer, + dag_state.clone(), + store.clone(), + leader_schedule.clone(), + ); + + let mut core = Core::new( + context.clone(), + leader_schedule, + transaction_consumer, + block_manager, + true, + commit_observer, + signals, + key_pairs.remove(context.own_index.value()).1, + dag_state.clone(), + true, + ); + + // No new block should have been produced + assert_eq!( + core.last_proposed_round(), + GENESIS_ROUND, + "No block should have been created other than genesis" + ); + + // Trying to explicitly propose a block will not produce anything + assert!(core.try_propose(true).unwrap().is_none()); + + // Create blocks for the whole network but not for authority 1 + let mut builder = DagBuilder::new(context.clone()); + builder + .layers(1..=12) + .authorities(vec![AuthorityIndex::new_for_test(1)]) + .skip_block() + .build(); + let blocks = builder.blocks(1..=12); + // Process all the blocks + assert!(core.add_blocks(blocks).unwrap().is_empty()); + core.set_last_known_proposed_round(12); + + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 13); + assert_eq!(block.ancestors().len(), 7); + + // Build blocks for rest of the network other than own index + builder + .layers(13..=14) + .authorities(vec![AuthorityIndex::new_for_test(0)]) + .skip_block() + .build(); + let blocks = builder.blocks(13..=14); + assert!(core.add_blocks(blocks).unwrap().is_empty()); + + // We now have triggered a leader schedule change so we should have + // one EXCLUDE ancestor when we go to select ancestors for the next proposal + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 15); + assert_eq!(block.ancestors().len(), 6); + + // Build blocks for a quorum of the network including the EXCLUDE ancestor + // which will trigger smart select and we will not propose a block + builder + .layer(15) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(5), + AuthorityIndex::new_for_test(6), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(15..=15); + // Wait for min round delay to allow blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // Smart select should be triggered and no block should be proposed. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 15); + + builder + .layer(15) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(1), + AuthorityIndex::new_for_test(2), + AuthorityIndex::new_for_test(3), + AuthorityIndex::new_for_test(4), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(15..=15); + // Have enough ancestor blocks to propose now. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 16); + + // Build blocks for a quorum of the network including the EXCLUDE ancestor + // which will trigger smart select and we will not propose a block. + // This time we will force propose by hitting the leader timeout after which + // should cause us to include this EXCLUDE ancestor. + builder + .layer(16) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(5), + AuthorityIndex::new_for_test(6), + ]) + .skip_block() + .build(); + let blocks = builder.blocks(16..=16); + // Wait for leader timeout to force blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // Smart select should be triggered and no block should be proposed. + assert!(core.add_blocks(blocks).unwrap().is_empty()); + assert_eq!(core.last_proposed_block().round(), 16); + + // Simulate a leader timeout and a force proposal where we will include + // one EXCLUDE ancestor when we go to select ancestors for the next proposal + let block = core.try_propose(true).expect("No error").unwrap(); + assert_eq!(block.round(), 17); + assert_eq!(block.ancestors().len(), 5); + } + #[tokio::test] async fn test_core_set_subscriber_exists() { telemetry_subscribers::init_for_testing(); @@ -1675,7 +2119,7 @@ mod test { } #[tokio::test] - async fn test_core_set_propagation_delay() { + async fn test_core_set_propagation_delay_per_authority() { // TODO: create helper to avoid the duplicated code here. telemetry_subscribers::init_for_testing(); let (context, mut key_pairs) = Context::new_for_test(4); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 83aada9cb05a7..1fc9a30a872e3 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -920,7 +920,7 @@ impl DagState { } pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores { - self.scoring_subdag.calculate_scores() + self.scoring_subdag.calculate_distributed_vote_scores() } pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex { diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 7136daf7c5e30..51ab0a5e4aa2e 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -129,6 +129,10 @@ impl ReputationScores { } } + pub(crate) fn highest_score(&self) -> u64 { + *self.scores_per_authority.iter().max().unwrap_or(&0) + } + // Returns the authorities index with score tuples. pub(crate) fn authorities_by_score(&self, context: Arc) -> Vec<(AuthorityIndex, u64)> { self.scores_per_authority @@ -258,17 +262,9 @@ impl ScoringSubdag { } // Iterate through votes and calculate scores for each authority based on - // scoring strategy that is used. (Vote or CertifiedVote) - pub(crate) fn calculate_scores(&self) -> ReputationScores { - let _s = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&["ScoringSubdag::calculate_scores"]) - .start_timer(); - - let scores_per_authority = self.score_distributed_votes(); + // distributed vote scoring strategy. + pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores { + let scores_per_authority = self.distributed_votes_scores(); // TODO: Normalize scores ReputationScores::new( @@ -283,7 +279,15 @@ impl ScoringSubdag { /// Instead of only giving one point for each vote that is included in 2f+1 /// blocks. We give a score equal to the amount of stake of all blocks that /// included the vote. - fn score_distributed_votes(&self) -> Vec { + fn distributed_votes_scores(&self) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::score_distributed_votes"]) + .start_timer(); + let num_authorities = self.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; @@ -299,29 +303,6 @@ impl ScoringSubdag { scores_per_authority } - /// This scoring strategy gives points equal to the amount of stake in blocks - /// that include the authority's vote, if that amount of total_stake > 2f+1. - /// We consider this a certified vote. - // TODO: This will be used for ancestor selection - #[allow(unused)] - fn score_certified_votes(&self) -> Vec { - let num_authorities = self.context.committee.size(); - let mut scores_per_authority = vec![0_u64; num_authorities]; - - for (vote, stake_agg) in self.votes.iter() { - let authority = vote.author; - if stake_agg.reached_threshold(&self.context.committee) { - let stake = stake_agg.stake(); - tracing::trace!( - "[{}] scores +{stake} reputation for {authority}!", - self.context.own_index, - ); - scores_per_authority[authority.value()] += stake; - } - } - scores_per_authority - } - pub(crate) fn scored_subdags_count(&self) -> usize { if let Some(commit_range) = &self.commit_range { commit_range.size() @@ -555,41 +536,11 @@ mod tests { scoring_subdag.add_subdags(vec![sub_dag]); } - let scores = scoring_subdag.calculate_scores(); + let scores = scoring_subdag.calculate_distributed_vote_scores(); assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]); assert_eq!(scores.commit_range, (1..=4).into()); } - #[tokio::test] - async fn test_certified_vote_scoring_subdag() { - telemetry_subscribers::init_for_testing(); - let context = Arc::new(Context::new_for_test(4).0); - - // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3. - let mut dag_builder = DagBuilder::new(context.clone()); - dag_builder.layers(1..=3).build(); - // Build round 4 but with just the leader block - dag_builder - .layer(4) - .authorities(vec![ - AuthorityIndex::new_for_test(1), - AuthorityIndex::new_for_test(2), - AuthorityIndex::new_for_test(3), - ]) - .skip_block() - .build(); - - let mut scoring_subdag = ScoringSubdag::new(context.clone()); - - for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { - scoring_subdag.add_subdags(vec![sub_dag]); - } - - let scores_per_authority = scoring_subdag.score_certified_votes(); - assert_eq!(scores_per_authority, vec![4, 4, 4, 4]); - assert_eq!(scoring_subdag.commit_range.unwrap(), (1..=4).into()); - } - // TODO: Remove all tests below this when DistributedVoteScoring is enabled. #[tokio::test] async fn test_reputation_score_calculator() { diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 36b980311f5d7..7cee617d64c35 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +mod ancestor; mod authority_node; mod authority_service; mod base_committer; diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 11d303f3cf65a..da29cfae33201 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -141,6 +141,10 @@ pub(crate) struct NodeMetrics { pub(crate) commit_round_advancement_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, + pub(crate) smart_selection_wait: IntCounter, + pub(crate) ancestor_state_change_by_authority: IntCounterVec, + pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec, + pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec, pub(crate) missing_blocks_total: IntCounter, pub(crate) missing_blocks_after_fetch_total: IntCounter, pub(crate) num_of_bad_nodes: IntGauge, @@ -441,6 +445,29 @@ impl NodeMetrics { &["timeout_type"], registry, ).unwrap(), + smart_selection_wait: register_int_counter_with_registry!( + "smart_selection_wait", + "Number of times we waited for smart ancestor selection.", + registry, + ).unwrap(), + ancestor_state_change_by_authority: register_int_counter_vec_with_registry!( + "ancestor_state_change_by_authority", + "The total number of times an ancestor state changed to EXCLUDE or INCLUDE.", + &["authority", "state"], + registry, + ).unwrap(), + excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "excluded_proposal_ancestors_count_by_authority", + "Total number of excluded ancestors per authority during proposal.", + &["authority"], + registry, + ).unwrap(), + included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "included_excluded_proposal_ancestors_count_by_authority", + "Total number of ancestors per authority with 'excluded' status that got included in proposal. Either weak or strong type.", + &["authority", "type"], + registry, + ).unwrap(), missing_blocks_total: register_int_counter_with_registry!( "missing_blocks_total", "Total cumulative number of missing blocks", diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index a80ffe148f1f9..3ce8b04dc8ede 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -238,7 +238,7 @@ impl RoundProber { .set_propagation_delay_and_quorum_rounds(propagation_delay, quorum_rounds.clone()) { tracing::warn!( - "Failed to set propagation delay {propagation_delay} on Core: {:?}", + "Failed to set propagation delay and quorum rounds {quorum_rounds:?} on Core: {:?}", e ); } diff --git a/crates/sui-open-rpc/spec/openrpc.json b/crates/sui-open-rpc/spec/openrpc.json index f389b2fc27312..f33a0c653b67b 100644 --- a/crates/sui-open-rpc/spec/openrpc.json +++ b/crates/sui-open-rpc/spec/openrpc.json @@ -1307,6 +1307,7 @@ "consensus_distributed_vote_scoring_strategy": false, "consensus_order_end_of_epoch_last": true, "consensus_round_prober": false, + "consensus_smart_ancestor_selection": false, "disable_invariant_violation_check_in_swap_loc": false, "disallow_adding_abilities_on_upgrade": false, "disallow_change_struct_type_params_on_upgrade": false, diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index cbaef85171d26..0a994958dfd95 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -196,6 +196,7 @@ const MAX_PROTOCOL_VERSION: u64 = 69; // Further reduce minimum number of random beacon shares. // Disallow adding new modules in `deps-only` packages. // Version 69: Sets number of rounds allowed for fastpath voting in consensus. +// Enable smart ancestor selection in devnet. #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); @@ -565,6 +566,10 @@ struct FeatureFlags { #[serde(skip_serializing_if = "is_false")] disallow_new_modules_in_deps_only_packages: bool, + + // Use smart ancestor selection in consensus. + #[serde(skip_serializing_if = "is_false")] + consensus_smart_ancestor_selection: bool, } fn is_false(b: &bool) -> bool { @@ -1675,6 +1680,10 @@ impl ProtocolConfig { self.feature_flags .disallow_new_modules_in_deps_only_packages } + + pub fn consensus_smart_ancestor_selection(&self) -> bool { + self.feature_flags.consensus_smart_ancestor_selection + } } #[cfg(not(msim))] @@ -2944,6 +2953,11 @@ impl ProtocolConfig { 69 => { // Sets number of rounds allowed for fastpath voting in consensus. cfg.consensus_voting_rounds = Some(40); + + if chain != Chain::Mainnet && chain != Chain::Testnet { + // Enable smart ancestor selection for devnet + cfg.feature_flags.consensus_smart_ancestor_selection = true; + } } // Use this template when making changes: // diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap index 2c42d9988aad7..ccca747ea3a97 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_69.snap @@ -72,6 +72,7 @@ feature_flags: relocate_event_module: true uncompressed_g1_group_elements: true disallow_new_modules_in_deps_only_packages: true + consensus_smart_ancestor_selection: true max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000