From 1ba631ba9581973e7c6cadeea92cfe1802aceb4a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 10 Oct 2024 19:32:15 +0200 Subject: [PATCH] feat: store safe block num as well (#11648) --- crates/blockchain-tree/src/externals.rs | 2 +- crates/chain-state/src/chain_info.rs | 8 +++- crates/chain-state/src/in_memory.rs | 17 ++++++--- crates/cli/commands/src/stage/unwind.rs | 2 +- .../consensus/beacon/src/engine/test_utils.rs | 9 ++++- crates/engine/tree/src/persistence.rs | 18 ++++++++- crates/engine/tree/src/tree/mod.rs | 13 +++++-- crates/stages/api/src/pipeline/mod.rs | 4 +- crates/storage/db/src/tables/mod.rs | 11 ++++-- .../src/providers/blockchain_provider.rs | 20 ++++++++-- .../src/providers/database/provider.rs | 37 ++++++++++++++----- crates/storage/provider/src/providers/mod.rs | 16 ++++++-- .../provider/src/traits/finalized_block.rs | 15 ++++++-- crates/storage/provider/src/traits/mod.rs | 2 +- 14 files changed, 130 insertions(+), 44 deletions(-) diff --git a/crates/blockchain-tree/src/externals.rs b/crates/blockchain-tree/src/externals.rs index a4f72f6d33d7..719852c12ac0 100644 --- a/crates/blockchain-tree/src/externals.rs +++ b/crates/blockchain-tree/src/externals.rs @@ -7,7 +7,7 @@ use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_node_types::NodeTypesWithDB; use reth_primitives::StaticFileSegment; use reth_provider::{ - providers::ProviderNodeTypes, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, + providers::ProviderNodeTypes, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StatsReader, }; use reth_storage_errors::provider::ProviderResult; diff --git a/crates/chain-state/src/chain_info.rs b/crates/chain-state/src/chain_info.rs index d9e2c03e2738..d36d9f47e438 100644 --- a/crates/chain-state/src/chain_info.rs +++ b/crates/chain-state/src/chain_info.rs @@ -21,9 +21,13 @@ pub struct ChainInfoTracker { impl ChainInfoTracker { /// Create a new chain info container for the given canonical head and finalized header if it /// exists. - pub fn new(head: SealedHeader, finalized: Option) -> Self { + pub fn new( + head: SealedHeader, + finalized: Option, + safe: Option, + ) -> Self { let (finalized_block, _) = watch::channel(finalized); - let (safe_block, _) = watch::channel(None); + let (safe_block, _) = watch::channel(safe); Self { inner: Arc::new(ChainInfoInner { diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 07120cf8ee31..fb67608ebda7 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -173,12 +173,13 @@ impl CanonicalInMemoryState { numbers: BTreeMap, pending: Option, finalized: Option, + safe: Option, ) -> Self { let in_memory_state = InMemoryState::new(blocks, numbers, pending); let header = in_memory_state .head_state() .map_or_else(SealedHeader::default, |state| state.block_ref().block().header.clone()); - let chain_info_tracker = ChainInfoTracker::new(header, finalized); + let chain_info_tracker = ChainInfoTracker::new(header, finalized, safe); let (canon_state_notification_sender, _) = broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); @@ -193,13 +194,17 @@ impl CanonicalInMemoryState { /// Create an empty state. pub fn empty() -> Self { - Self::new(HashMap::default(), BTreeMap::new(), None, None) + Self::new(HashMap::default(), BTreeMap::new(), None, None, None) } /// Create a new in memory state with the given local head and finalized header /// if it exists. - pub fn with_head(head: SealedHeader, finalized: Option) -> Self { - let chain_info_tracker = ChainInfoTracker::new(head, finalized); + pub fn with_head( + head: SealedHeader, + finalized: Option, + safe: Option, + ) -> Self { + let chain_info_tracker = ChainInfoTracker::new(head, finalized, safe); let in_memory_state = InMemoryState::default(); let (canon_state_notification_sender, _) = broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); @@ -1255,7 +1260,7 @@ mod tests { numbers.insert(2, block2.block().hash()); numbers.insert(3, block3.block().hash()); - let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None); + let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None, None); let historical: StateProviderBox = Box::new(MockStateProvider); @@ -1297,7 +1302,7 @@ mod tests { let mut numbers = BTreeMap::new(); numbers.insert(1, hash); - let state = CanonicalInMemoryState::new(blocks, numbers, None, None); + let state = CanonicalInMemoryState::new(blocks, numbers, None, None, None); let chain: Vec<_> = state.canonical_chain().collect(); assert_eq!(chain.len(), 1); diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index 19305554eaa3..a5c9956c95b2 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -17,7 +17,7 @@ use reth_node_builder::{NodeTypesWithDB, NodeTypesWithEngine}; use reth_node_core::args::NetworkArgs; use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainSpecProvider, - FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory, + ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, }; use reth_prune::PruneModes; use reth_stages::{ diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 4dfd9c87d321..633ae03d8ad9 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -398,8 +398,13 @@ where let (header, seal) = sealed.into_parts(); let genesis_block = SealedHeader::new(header, seal); - let blockchain_provider = - BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None); + let blockchain_provider = BlockchainProvider::with_blocks( + provider_factory.clone(), + tree, + genesis_block, + None, + None, + ); let pruner = Pruner::new_with_factory( provider_factory.clone(), diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index dfddfcfaa89b..25c1f0ed7030 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -4,7 +4,7 @@ use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, - DatabaseProviderFactory, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory, + ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; @@ -97,6 +97,11 @@ impl PersistenceService { provider.save_finalized_block_number(finalized_block)?; provider.commit()?; } + PersistenceAction::SaveSafeBlock(safe_block) => { + let provider = self.provider.database_provider_rw()?; + provider.save_safe_block_number(safe_block)?; + provider.commit()?; + } } } Ok(()) @@ -176,6 +181,9 @@ pub enum PersistenceAction { /// Update the persisted finalized block on disk SaveFinalizedBlock(u64), + + /// Update the persisted safe block on disk + SaveSafeBlock(u64), } /// A handle to the persistence service @@ -251,6 +259,14 @@ impl PersistenceHandle { self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block)) } + /// Persists the finalized block number on disk. + pub fn save_safe_block_number( + &self, + safe_block: u64, + ) -> Result<(), SendError> { + self.send_action(PersistenceAction::SaveSafeBlock(safe_block)) + } + /// Tells the persistence service to remove blocks above a certain block number. The removed /// blocks are returned by the service. /// diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index bc1da6369459..3eadbbd522db 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2401,8 +2401,13 @@ where // if the safe block is not known, we can't update the safe block return Err(OnForkChoiceUpdated::invalid_state()) } - Ok(Some(finalized)) => { - self.canonical_in_memory_state.set_safe(finalized); + Ok(Some(safe)) => { + if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() { + // we're also persisting the safe block on disk so we can reload it on + // restart this is required by optimism which queries the safe block: + let _ = self.persistence.save_safe_block_number(safe.number); + self.canonical_in_memory_state.set_safe(safe); + } } Err(err) => { error!(target: "engine::tree", %err, "Failed to fetch safe block header"); @@ -2680,7 +2685,7 @@ mod tests { let (header, seal) = sealed.into_parts(); let header = SealedHeader::new(header, seal); let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash()); - let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None); + let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None); let (to_payload_service, _payload_command_rx) = unbounded_channel(); let payload_builder = PayloadBuilderHandle::new(to_payload_service); @@ -2744,7 +2749,7 @@ mod tests { let last_executed_block = blocks.last().unwrap().clone(); let pending = Some(BlockState::new(last_executed_block)); self.tree.canonical_in_memory_state = - CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None); + CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None); self.blocks = blocks.clone(); self.persist_blocks( diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 19b68b384853..1f6d9341ad6d 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -7,8 +7,8 @@ pub use event::*; use futures_util::Future; use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH; use reth_provider::{ - providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory, - FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StageCheckpointReader, + providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader, + ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, }; use reth_prune::PrunerBuilder; diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 384139618163..83a063903e08 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -416,6 +416,8 @@ tables! { pub enum ChainStateKey { /// Last finalized block key LastFinalizedBlock, + /// Last finalized block key + LastSafeBlockBlock, } impl Encode for ChainStateKey { @@ -424,16 +426,17 @@ impl Encode for ChainStateKey { fn encode(self) -> Self::Encoded { match self { Self::LastFinalizedBlock => [0], + Self::LastSafeBlockBlock => [1], } } } impl Decode for ChainStateKey { fn decode(value: &[u8]) -> Result { - if value == [0] { - Ok(Self::LastFinalizedBlock) - } else { - Err(reth_db_api::DatabaseError::Decode) + match value { + [0] => Ok(Self::LastFinalizedBlock), + [1] => Ok(Self::LastSafeBlockBlock), + _ => Err(reth_db_api::DatabaseError::Decode), } } } diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 54f28b77b912..a0811db2aee2 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -1,8 +1,8 @@ use crate::{ providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, - DatabaseProviderRO, EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ProviderError, + CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, + DatabaseProviderFactory, DatabaseProviderRO, EvmEnvProvider, HeaderProvider, ProviderError, ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, @@ -93,9 +93,23 @@ impl BlockchainProvider2 { .map(|num| provider.sealed_header(num)) .transpose()? .flatten(); + let safe_header = provider + .last_safe_block_number()? + .or_else(|| { + // for the purpose of this we can also use the finalized block if we don't have the + // safe block + provider.last_finalized_block_number().ok().flatten() + }) + .map(|num| provider.sealed_header(num)) + .transpose()? + .flatten(); Ok(Self { database, - canonical_in_memory_state: CanonicalInMemoryState::with_head(latest, finalized_header), + canonical_in_memory_state: CanonicalInMemoryState::with_head( + latest, + finalized_header, + safe_header, + ), }) } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 767f92db98f2..c9b0af3d33ca 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -7,13 +7,14 @@ use crate::{ }, writer::UnifiedStorageWriter, AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, - BlockReader, BlockWriter, BundleStateInit, DBProvider, EvmEnvProvider, FinalizedBlockReader, - FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, - HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown, - ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit, - StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter, - StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, - TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider, + BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter, + DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap, + HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, LatestStateProvider, + OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, + RequestsProvider, RevertsInit, StageCheckpointReader, StateChangeWriter, StateProviderBox, + StateReader, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader, + StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt, + TrieWriter, WithdrawalsProvider, }; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256}; @@ -3596,7 +3597,7 @@ impl StatsReader for DatabaseProvider { } } -impl FinalizedBlockReader for DatabaseProvider { +impl ChainStateBlockReader for DatabaseProvider { fn last_finalized_block_number(&self) -> ProviderResult> { let mut finalized_blocks = self .tx @@ -3608,14 +3609,32 @@ impl FinalizedBlockReader for DatabaseProvider ProviderResult> { + let mut finalized_blocks = self + .tx + .cursor_read::()? + .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))? + .take(1) + .collect::, _>>()?; + + let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1); + Ok(last_finalized_block_number) + } } -impl FinalizedBlockWriter for DatabaseProvider { +impl ChainStateBlockWriter for DatabaseProvider { fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> { Ok(self .tx .put::(tables::ChainStateKey::LastFinalizedBlock, block_number)?) } + + fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> { + Ok(self + .tx + .put::(tables::ChainStateKey::LastSafeBlockBlock, block_number)?) + } } impl DBProvider for DatabaseProvider { diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 5fed81c155d1..561e1d974362 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -1,9 +1,9 @@ use crate::{ AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, - EvmEnvProvider, FinalizedBlockReader, FullExecutionDataProvider, HeaderProvider, ProviderError, - PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, + CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, + DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider, HeaderProvider, + ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider, }; @@ -109,8 +109,9 @@ impl BlockchainProvider { tree: Arc, latest: SealedHeader, finalized: Option, + safe: Option, ) -> Self { - Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized) } + Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized, safe) } } /// Create a new provider using only the database and the tree, fetching the latest header from @@ -128,11 +129,18 @@ impl BlockchainProvider { .transpose()? .flatten(); + let safe_header = provider + .last_safe_block_number()? + .map(|num| provider.sealed_header(num)) + .transpose()? + .flatten(); + Ok(Self::with_blocks( database, tree, SealedHeader::new(latest_header, best.best_hash), finalized_header, + safe_header, )) } diff --git a/crates/storage/provider/src/traits/finalized_block.rs b/crates/storage/provider/src/traits/finalized_block.rs index 5509db0aa939..98a6d9d0e343 100644 --- a/crates/storage/provider/src/traits/finalized_block.rs +++ b/crates/storage/provider/src/traits/finalized_block.rs @@ -1,16 +1,23 @@ use alloy_primitives::BlockNumber; use reth_errors::ProviderResult; -/// Functionality to read the last known finalized block from the database. -pub trait FinalizedBlockReader: Send + Sync { +/// Functionality to read the last known chain blocks from the database. +pub trait ChainStateBlockReader: Send + Sync { /// Returns the last finalized block number. /// /// If no finalized block has been written yet, this returns `None`. fn last_finalized_block_number(&self) -> ProviderResult>; + /// Returns the last safe block number. + /// + /// If no safe block has been written yet, this returns `None`. + fn last_safe_block_number(&self) -> ProviderResult>; } -/// Functionality to write the last known finalized block to the database. -pub trait FinalizedBlockWriter: Send + Sync { +/// Functionality to write the last known chain blocks to the database. +pub trait ChainStateBlockWriter: Send + Sync { /// Saves the given finalized block number in the DB. fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>; + + /// Saves the given safe block number in the DB. + fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>; } diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 8bae4f67ae85..c31c7c1e2f21 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -42,4 +42,4 @@ mod tree_viewer; pub use tree_viewer::TreeViewer; mod finalized_block; -pub use finalized_block::{FinalizedBlockReader, FinalizedBlockWriter}; +pub use finalized_block::{ChainStateBlockReader, ChainStateBlockWriter};