Skip to content

Commit

Permalink
Ensure consistency of decide events
Browse files Browse the repository at this point in the history
The logic for moving data from consensus storae to archive storage
was sometimes generating an invalid decide event, because it made a
chain of non-consecutive leaves originating from multiple separate
events. This fixes the issue and prevents it from happening again by
generating a separate decide event for each leaf in consensus storage.

Also adds a regression test.
  • Loading branch information
jbearer committed Nov 1, 2024
1 parent a982dd7 commit 1df8311
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 105 deletions.
140 changes: 135 additions & 5 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,13 +1020,23 @@ mod api_tests {
use data_source::testing::TestableSequencerDataSource;
use endpoints::NamespaceProofQueryData;

use espresso_types::{Header, NamespaceId};
use espresso_types::{
traits::{EventConsumer, PersistenceOptions},
Header, Leaf, NamespaceId,
};
use ethers::utils::Anvil;
use futures::stream::StreamExt;
use hotshot_query_service::availability::{BlockQueryData, VidCommonQueryData};
use futures::{future, stream::StreamExt};
use hotshot_query_service::availability::{
AvailabilityDataSource, BlockQueryData, UpdateAvailabilityData, VidCommonQueryData,
};
use hotshot_types::{
data::QuorumProposal, event::LeafInfo, simple_certificate::QuorumCertificate,
traits::node_implementation::ConsensusTime,
};

use portpicker::pick_unused_port;
use sequencer_utils::test_utils::setup_test;
use std::fmt::Debug;
use surf_disco::Client;
use test_helpers::{
catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
Expand All @@ -1035,8 +1045,11 @@ mod api_tests {
use tide_disco::error::ServerError;
use vbs::version::StaticVersion;

use super::*;
use crate::testing::{wait_for_decide_on_handle, TestConfigBuilder};
use super::{update::ApiEventConsumer, *};
use crate::{
persistence::no_storage::NoStorage,
testing::{wait_for_decide_on_handle, TestConfigBuilder},
};

#[async_std::test]
pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>() {
Expand Down Expand Up @@ -1159,6 +1172,123 @@ mod api_tests {
let storage = D::create_storage().await;
catchup_test_helper(|opt| D::options(&storage, opt)).await
}

#[async_std::test]
pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>()
where
D: TestableSequencerDataSource + Debug + 'static,
for<'a> D::Transaction<'a>: UpdateAvailabilityData<SeqTypes>,
{
#[derive(Clone, Copy, Debug)]
struct FailConsumer;

#[async_trait]
impl EventConsumer for FailConsumer {
async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
bail!("mock error injection");
}
}

setup_test();

let storage = D::create_storage().await;
let persistence = D::persistence_options(&storage).create().await.unwrap();
let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
Arc::new(StorageState::new(
D::create(D::persistence_options(&storage), Default::default(), false)
.await
.unwrap(),
ApiState::new(future::pending()),
));

// Create two non-consecutive leaf chains.
let mut chain1 = vec![];

let mut quorum_proposal = QuorumProposal::<SeqTypes> {
block_header: Leaf::genesis(&Default::default(), &NodeState::mock())
.await
.block_header()
.clone(),
view_number: ViewNumber::genesis(),
justify_qc: QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
&NodeState::mock(),
)
.await,
upgrade_certificate: None,
proposal_certificate: None,
};
let mut qc = QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
&NodeState::mock(),
)
.await;

let mut justify_qc = qc.clone();
for i in 0..5 {
*quorum_proposal.block_header.height_mut() = i;
quorum_proposal.view_number = ViewNumber::new(i);
quorum_proposal.justify_qc = justify_qc;
let leaf = Leaf::from_quorum_proposal(&quorum_proposal);
qc.view_number = leaf.view_number();
qc.data.leaf_commit = Committable::commit(&leaf);
justify_qc = qc.clone();
chain1.push((leaf.clone(), qc.clone()));
}
// Split into two chains.
let mut chain2 = chain1.split_off(2);
// Make non-consecutive (i.e. we skip a leaf).
chain2.remove(0);

// Decide 2 leaves, but fail in event processing.
let leaf_chain = chain1
.iter()
.map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
.collect::<Vec<_>>();
tracing::info!("decide with event handling failure");
persistence
.append_decided_leaves(
ViewNumber::new(1),
leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
&FailConsumer,
)
.await
.unwrap();

// Now decide remaining leaves successfully. We should now process a decide event for all
// the leaves.
let consumer = ApiEventConsumer::from(data_source.clone());
let leaf_chain = chain2
.iter()
.map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
.collect::<Vec<_>>();
tracing::info!("decide successfully");
persistence
.append_decided_leaves(
ViewNumber::new(4),
leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
&consumer,
)
.await
.unwrap();

// Check that the leaves were moved to archive storage.
for (leaf, qc) in chain1.iter().chain(&chain2) {
tracing::info!(height = leaf.height(), "check archive");
let qd = data_source.get_leaf(leaf.height() as usize).await.await;
assert_eq!(qd.leaf(), leaf);
assert_eq!(qd.qc(), qc);
}
}

fn leaf_info(leaf: Leaf) -> LeafInfo<SeqTypes> {
LeafInfo {
leaf,
vid_share: None,
state: Default::default(),
delta: None,
}
}
}

#[cfg(test)]
Expand Down
36 changes: 19 additions & 17 deletions sequencer/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ mod persistence_tests {
events: Arc<RwLock<Vec<Event>>>,
}

impl EventCollector {
async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
self.events
.read()
.await
.iter()
.flat_map(|event| {
let EventType::Decide { leaf_chain, .. } = &event.event else {
panic!("expected decide event, got {event:?}");
};
leaf_chain.iter().cloned().rev()
})
.collect::<Vec<_>>()
}
}

#[async_trait]
impl EventConsumer for EventCollector {
async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
Expand Down Expand Up @@ -420,15 +436,7 @@ mod persistence_tests {
);

// A decide event should have been processed.
let events = consumer.events.read().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].view_number, ViewNumber::new(2));
let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else {
panic!("expected decide event, got {:?}", events[0]);
};
assert_eq!(**qc, qcs[2]);
assert_eq!(leaf_chain.len(), 3, "{leaf_chain:#?}");
for (leaf, info) in leaves.iter().zip(leaf_chain.iter().rev()) {
for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
assert_eq!(info.leaf, *leaf);
let decided_vid_share = info.vid_share.as_ref().unwrap();
assert_eq!(decided_vid_share.view_number, leaf.view_number());
Expand Down Expand Up @@ -714,15 +722,9 @@ mod persistence_tests {

// Check decide event.
tracing::info!("check decide event");
let events = consumer.events.read().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].view_number, ViewNumber::new(3));
let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else {
panic!("expected decide event, got {:?}", events[0]);
};
assert_eq!(**qc, chain[3].1);
let leaf_chain = consumer.leaf_chain().await;
assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
for ((leaf, _, _, _), info) in chain.iter().zip(leaf_chain.iter().rev()) {
for ((leaf, _, _, _), info) in chain.iter().zip(leaf_chain.iter()) {
assert_eq!(info.leaf, *leaf);
let decided_vid_share = info.vid_share.as_ref().unwrap();
assert_eq!(decided_vid_share.view_number, leaf.view_number());
Expand Down
59 changes: 26 additions & 33 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,15 @@ impl Inner {
Ok(())
}

fn decide_event(&self, view: ViewNumber) -> anyhow::Result<Event<SeqTypes>> {
// Construct a chain of all decided leaves up to `view` which have not yet been garbage
// collected.
async fn generate_decide_events(
&self,
view: ViewNumber,
consumer: &impl EventConsumer,
) -> anyhow::Result<()> {
// Generate a decide event for each leaf, to be processed by the event consumer. We make a
// separate event for each leaf because it is possible we have non-consecutive leaves in our
// storage, which would not be valid as a single decide with a single leaf chain.
let mut leaves = BTreeMap::new();
let mut high_qc: Option<QuorumCertificate<SeqTypes>> = None;

for entry in fs::read_dir(self.decided_leaf_path())? {
let entry = entry?;
let path = entry.path();
Expand Down Expand Up @@ -261,14 +264,7 @@ impl Inner {
delta: Default::default(),
};

leaves.insert(v, info);
if let Some(high_qc) = &mut high_qc {
if v > high_qc.view_number.u64() {
*high_qc = qc;
}
} else {
high_qc = Some(qc);
}
leaves.insert(v, (info, qc));
}

// The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is
Expand All @@ -282,15 +278,20 @@ impl Inner {
}
}

let high_qc = high_qc.context("no new leaves at decide event")?;
Ok(Event {
view_number: view,
event: EventType::Decide {
qc: Arc::new(high_qc),
block_size: None,
leaf_chain: Arc::new(leaves.into_values().rev().collect()),
},
})
for (view, (leaf, qc)) in leaves {
consumer
.handle_event(&Event {
view_number: ViewNumber::new(view),
event: EventType::Decide {
qc: Arc::new(qc),
leaf_chain: Arc::new(vec![leaf]),
block_size: None,
},
})
.await?;
}

Ok(())
}

fn load_da_proposal(
Expand Down Expand Up @@ -466,17 +467,9 @@ impl SequencerPersistence for Persistence {
// persist the decided leaves successfully, and the event processing will just run again at
// the next decide. If there is an error here, we just log it and return early with success
// to prevent GC from running before the decided leaves are processed.
match inner.decide_event(view) {
Ok(event) => {
if let Err(err) = consumer.handle_event(&event).await {
tracing::warn!(?view, "event processing failed: {err:#}");
return Ok(());
}
}
Err(err) => {
tracing::warn!(?view, "event creation: {err:#}");
return Ok(());
}
if let Err(err) = inner.generate_decide_events(view, consumer).await {
tracing::warn!(?view, "event processing failed: {err:#}");
return Ok(());
}

if let Err(err) = inner.collect_garbage(view) {
Expand Down
Loading

0 comments on commit 1df8311

Please sign in to comment.