From 1444baa426fbe712da848c784176cb1bf440c21a Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 22 Aug 2024 18:14:23 -0400 Subject: [PATCH] replay: do not early return when marking slots duplicate confirmed (#2700) * replay: do not early return when marking slots duplicate confirmed * pr feedback: catch panic explicitely, comments, add root test case * pr feedback: add custom string to panic message * pr feedback: add slot to log, use should_panic * pr feedback: notification for {slot} -> notification for slot {slot} --- core/src/replay_stage.rs | 252 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 248 insertions(+), 4 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b368ac1de89ea8..f2cedd7d731aac 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1829,9 +1829,12 @@ impl ReplayStage { } else if let Some(prev_hash) = duplicate_confirmed_slots.insert(confirmed_slot, duplicate_confirmed_hash) { - assert_eq!(prev_hash, duplicate_confirmed_hash); + assert_eq!( + prev_hash, duplicate_confirmed_hash, + "Additional duplicate confirmed notification for slot {confirmed_slot} with a different hash" + ); // Already processed this signal - return; + continue; } let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state( @@ -3791,9 +3794,12 @@ impl ReplayStage { progress.set_duplicate_confirmed_hash(*slot, *frozen_hash); if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) { - assert_eq!(prev_hash, *frozen_hash); + assert_eq!( + prev_hash, *frozen_hash, + "Additional duplicate confirmed notification for slot {slot} with a different hash" + ); // Already processed this signal - return; + continue; } let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state( @@ -4184,6 +4190,7 @@ pub(crate) mod tests { sync::{atomic::AtomicU64, Arc, RwLock}, }, tempfile::tempdir, + test_case::test_case, trees::{tr, Tree}, }; @@ -9018,4 +9025,241 @@ pub(crate) mod tests { assert_eq!(working_bank.slot(), good_slot); assert_eq!(working_bank.parent_slot(), initial_slot); } + + #[test] + #[should_panic(expected = "Additional duplicate confirmed notification for slot 6")] + fn test_mark_slots_duplicate_confirmed() { + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let (vote_simulator, blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let VoteSimulator { + bank_forks, + mut heaviest_subtree_fork_choice, + mut progress, + .. + } = vote_simulator; + + let (ancestor_hashes_replay_update_sender, _) = unbounded(); + let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default(); + let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap(); + bank_forks + .write() + .unwrap() + .set_root(1, &AbsRequestSender::default(), None) + .unwrap(); + + // Mark 0 as duplicate confirmed, should fail as it is 0 < root + let confirmed_slots = [(0, bank_hash_0)]; + ReplayStage::mark_slots_duplicate_confirmed( + &confirmed_slots, + &blockstore, + &bank_forks, + &mut progress, + &mut DuplicateSlotsTracker::default(), + &mut heaviest_subtree_fork_choice, + &mut EpochSlotsFrozenSlots::default(), + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + &mut duplicate_confirmed_slots, + ); + + assert!(!duplicate_confirmed_slots.contains_key(&0)); + + // Mark 5 as duplicate confirmed, should suceed + let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap(); + let confirmed_slots = [(5, bank_hash_5)]; + + ReplayStage::mark_slots_duplicate_confirmed( + &confirmed_slots, + &blockstore, + &bank_forks, + &mut progress, + &mut DuplicateSlotsTracker::default(), + &mut heaviest_subtree_fork_choice, + &mut EpochSlotsFrozenSlots::default(), + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + &mut duplicate_confirmed_slots, + ); + + assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(5, bank_hash_5)) + .unwrap_or(false)); + + // Mark 5 and 6 as duplicate confirmed, should succeed + let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap(); + let confirmed_slots = [(5, bank_hash_5), (6, bank_hash_6)]; + + ReplayStage::mark_slots_duplicate_confirmed( + &confirmed_slots, + &blockstore, + &bank_forks, + &mut progress, + &mut DuplicateSlotsTracker::default(), + &mut heaviest_subtree_fork_choice, + &mut EpochSlotsFrozenSlots::default(), + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + &mut duplicate_confirmed_slots, + ); + + assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(5, bank_hash_5)) + .unwrap_or(false)); + assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(6, bank_hash_6)) + .unwrap_or(false)); + + // Mark 6 as duplicate confirmed again with a different hash, should panic + let confirmed_slots = [(6, Hash::new_unique())]; + ReplayStage::mark_slots_duplicate_confirmed( + &confirmed_slots, + &blockstore, + &bank_forks, + &mut progress, + &mut DuplicateSlotsTracker::default(), + &mut heaviest_subtree_fork_choice, + &mut EpochSlotsFrozenSlots::default(), + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + &mut duplicate_confirmed_slots, + ); + } + + #[test_case(true ; "same_batch")] + #[test_case(false ; "seperate_batches")] + #[should_panic(expected = "Additional duplicate confirmed notification for slot 6")] + fn test_process_duplicate_confirmed_slots(same_batch: bool) { + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let (vote_simulator, blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let VoteSimulator { + bank_forks, + mut heaviest_subtree_fork_choice, + progress, + .. + } = vote_simulator; + + let (ancestor_hashes_replay_update_sender, _) = unbounded(); + let (sender, receiver) = unbounded(); + let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default(); + let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap(); + bank_forks + .write() + .unwrap() + .set_root(1, &AbsRequestSender::default(), None) + .unwrap(); + + // Mark 0 as duplicate confirmed, should fail as it is 0 < root + sender.send(vec![(0, bank_hash_0)]).unwrap(); + + ReplayStage::process_duplicate_confirmed_slots( + &receiver, + &blockstore, + &mut DuplicateSlotsTracker::default(), + &mut duplicate_confirmed_slots, + &mut EpochSlotsFrozenSlots::default(), + &bank_forks, + &progress, + &mut heaviest_subtree_fork_choice, + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + ); + + assert!(!duplicate_confirmed_slots.contains_key(&0)); + + // Mark 5 as duplicate confirmed, should succed + let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap(); + sender.send(vec![(5, bank_hash_5)]).unwrap(); + + ReplayStage::process_duplicate_confirmed_slots( + &receiver, + &blockstore, + &mut DuplicateSlotsTracker::default(), + &mut duplicate_confirmed_slots, + &mut EpochSlotsFrozenSlots::default(), + &bank_forks, + &progress, + &mut heaviest_subtree_fork_choice, + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + ); + + assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(5, bank_hash_5)) + .unwrap_or(false)); + + // Mark 5 and 6 as duplicate confirmed, should suceed + let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap(); + if same_batch { + sender + .send(vec![(5, bank_hash_5), (6, bank_hash_6)]) + .unwrap(); + } else { + sender.send(vec![(5, bank_hash_5)]).unwrap(); + sender.send(vec![(6, bank_hash_6)]).unwrap(); + } + + ReplayStage::process_duplicate_confirmed_slots( + &receiver, + &blockstore, + &mut DuplicateSlotsTracker::default(), + &mut duplicate_confirmed_slots, + &mut EpochSlotsFrozenSlots::default(), + &bank_forks, + &progress, + &mut heaviest_subtree_fork_choice, + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + ); + + assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(5, bank_hash_5)) + .unwrap_or(false)); + assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6); + assert!(heaviest_subtree_fork_choice + .is_duplicate_confirmed(&(6, bank_hash_6)) + .unwrap_or(false)); + + // Mark 6 as duplicate confirmed again with a different hash, should panic + sender.send(vec![(6, Hash::new_unique())]).unwrap(); + + ReplayStage::process_duplicate_confirmed_slots( + &receiver, + &blockstore, + &mut DuplicateSlotsTracker::default(), + &mut duplicate_confirmed_slots, + &mut EpochSlotsFrozenSlots::default(), + &bank_forks, + &progress, + &mut heaviest_subtree_fork_choice, + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + &mut PurgeRepairSlotCounter::default(), + ); + } }