Skip to content

Commit

Permalink
replay: do not early return when marking slots duplicate confirmed (#…
Browse files Browse the repository at this point in the history
…2700)

* replay: do not early return when marking slots duplicate confirmed

* pr feedback: catch panic explicitely, comments, add root test case

* pr feedback: add custom string to panic message

* pr feedback: add slot to log, use should_panic

* pr feedback: notification for {slot} -> notification for slot {slot}
  • Loading branch information
AshwinSekar authored Aug 22, 2024
1 parent 22eec1c commit 1444baa
Showing 1 changed file with 248 additions and 4 deletions.
252 changes: 248 additions & 4 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1829,9 +1829,12 @@ impl ReplayStage {
} else if let Some(prev_hash) =
duplicate_confirmed_slots.insert(confirmed_slot, duplicate_confirmed_hash)
{
assert_eq!(prev_hash, duplicate_confirmed_hash);
assert_eq!(
prev_hash, duplicate_confirmed_hash,
"Additional duplicate confirmed notification for slot {confirmed_slot} with a different hash"
);
// Already processed this signal
return;
continue;
}

let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
Expand Down Expand Up @@ -3791,9 +3794,12 @@ impl ReplayStage {

progress.set_duplicate_confirmed_hash(*slot, *frozen_hash);
if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) {
assert_eq!(prev_hash, *frozen_hash);
assert_eq!(
prev_hash, *frozen_hash,
"Additional duplicate confirmed notification for slot {slot} with a different hash"
);
// Already processed this signal
return;
continue;
}

let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
Expand Down Expand Up @@ -4184,6 +4190,7 @@ pub(crate) mod tests {
sync::{atomic::AtomicU64, Arc, RwLock},
},
tempfile::tempdir,
test_case::test_case,
trees::{tr, Tree},
};

Expand Down Expand Up @@ -9018,4 +9025,241 @@ pub(crate) mod tests {
assert_eq!(working_bank.slot(), good_slot);
assert_eq!(working_bank.parent_slot(), initial_slot);
}

#[test]
#[should_panic(expected = "Additional duplicate confirmed notification for slot 6")]
fn test_mark_slots_duplicate_confirmed() {
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let (vote_simulator, blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let VoteSimulator {
bank_forks,
mut heaviest_subtree_fork_choice,
mut progress,
..
} = vote_simulator;

let (ancestor_hashes_replay_update_sender, _) = unbounded();
let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default();
let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap();
bank_forks
.write()
.unwrap()
.set_root(1, &AbsRequestSender::default(), None)
.unwrap();

// Mark 0 as duplicate confirmed, should fail as it is 0 < root
let confirmed_slots = [(0, bank_hash_0)];
ReplayStage::mark_slots_duplicate_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert!(!duplicate_confirmed_slots.contains_key(&0));

// Mark 5 as duplicate confirmed, should suceed
let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap();
let confirmed_slots = [(5, bank_hash_5)];

ReplayStage::mark_slots_duplicate_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));

// Mark 5 and 6 as duplicate confirmed, should succeed
let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap();
let confirmed_slots = [(5, bank_hash_5), (6, bank_hash_6)];

ReplayStage::mark_slots_duplicate_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));
assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(6, bank_hash_6))
.unwrap_or(false));

// Mark 6 as duplicate confirmed again with a different hash, should panic
let confirmed_slots = [(6, Hash::new_unique())];
ReplayStage::mark_slots_duplicate_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);
}

#[test_case(true ; "same_batch")]
#[test_case(false ; "seperate_batches")]
#[should_panic(expected = "Additional duplicate confirmed notification for slot 6")]
fn test_process_duplicate_confirmed_slots(same_batch: bool) {
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let (vote_simulator, blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let VoteSimulator {
bank_forks,
mut heaviest_subtree_fork_choice,
progress,
..
} = vote_simulator;

let (ancestor_hashes_replay_update_sender, _) = unbounded();
let (sender, receiver) = unbounded();
let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default();
let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap();
bank_forks
.write()
.unwrap()
.set_root(1, &AbsRequestSender::default(), None)
.unwrap();

// Mark 0 as duplicate confirmed, should fail as it is 0 < root
sender.send(vec![(0, bank_hash_0)]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert!(!duplicate_confirmed_slots.contains_key(&0));

// Mark 5 as duplicate confirmed, should succed
let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap();
sender.send(vec![(5, bank_hash_5)]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));

// Mark 5 and 6 as duplicate confirmed, should suceed
let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap();
if same_batch {
sender
.send(vec![(5, bank_hash_5), (6, bank_hash_6)])
.unwrap();
} else {
sender.send(vec![(5, bank_hash_5)]).unwrap();
sender.send(vec![(6, bank_hash_6)]).unwrap();
}

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));
assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(6, bank_hash_6))
.unwrap_or(false));

// Mark 6 as duplicate confirmed again with a different hash, should panic
sender.send(vec![(6, Hash::new_unique())]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);
}
}

0 comments on commit 1444baa

Please sign in to comment.