diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index 0708274f60c98..fbfba3c3c521d 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -10,7 +10,7 @@ use crate::{ }; use aptos_drop_helper::DropHelper; use aptos_storage_interface::state_store::{ - state_delta::StateDelta, state_view::cached_state_view::StateCache, + state::State, state_view::cached_state_view::ShardedStateCache, }; use aptos_types::{ contract_event::ContractEvent, @@ -36,14 +36,23 @@ impl ExecutionOutput { to_commit: TransactionsToKeep, to_discard: TransactionsWithOutput, to_retry: TransactionsWithOutput, - state_cache: StateCache, + last_checkpoint_state: Option, + result_state: State, + state_reads: ShardedStateCache, block_end_info: Option, next_epoch_state: Option, subscribable_events: Planned>, ) -> Self { + let next_version = first_version + to_commit.len() as Version; + assert_eq!(next_version, result_state.next_version()); if is_block { // If it's a block, ensure it ends with state checkpoint. assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint()); + assert!(last_checkpoint_state.is_some()); + assert!(last_checkpoint_state + .as_ref() + .unwrap() + .is_the_same(&result_state)); } else { // If it's not, there shouldn't be any transaction to be discarded or retried. assert!(to_discard.is_empty() && to_retry.is_empty()); @@ -56,22 +65,26 @@ impl ExecutionOutput { to_commit, to_discard, to_retry, - state_cache, + last_checkpoint_state, + result_state, + state_reads, block_end_info, next_epoch_state, subscribable_events, }) } - pub fn new_empty(state: Arc) -> Self { + pub fn new_empty(parent_state: State) -> Self { Self::new_impl(Inner { is_block: false, - first_version: state.next_version(), + first_version: parent_state.next_version(), statuses_for_input_txns: vec![], to_commit: TransactionsToKeep::new_empty(), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - state_cache: StateCache::new_empty(state.current.clone()), + last_checkpoint_state: None, + result_state: parent_state, + state_reads: ShardedStateCache::default(), block_end_info: None, next_epoch_state: None, subscribable_events: Planned::ready(vec![]), @@ -88,7 +101,9 @@ impl ExecutionOutput { to_commit: TransactionsToKeep::new_dummy_success(txns), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - state_cache: StateCache::new_dummy(), + last_checkpoint_state: None, + result_state: State::new_empty(), + state_reads: ShardedStateCache::default(), block_end_info: None, next_epoch_state: None, subscribable_events: Planned::ready(vec![]), @@ -107,7 +122,9 @@ impl ExecutionOutput { to_commit: TransactionsToKeep::new_empty(), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - state_cache: StateCache::new_dummy(), + last_checkpoint_state: None, + result_state: self.result_state.clone(), + state_reads: ShardedStateCache::default(), block_end_info: None, next_epoch_state: self.next_epoch_state.clone(), subscribable_events: Planned::ready(vec![]), @@ -146,10 +163,12 @@ pub struct Inner { pub to_discard: TransactionsWithOutput, pub to_retry: TransactionsWithOutput, - /// Carries the frozen base state view, so all in-mem nodes involved won't drop before the - /// execution result is processed; as well as all the accounts touched during execution, together - /// with their proofs. - pub state_cache: StateCache, + pub last_checkpoint_state: Option, + pub result_state: State, + /// State items read during execution, useful for calculating the state storge usage and + /// indices used by the db pruner. + pub state_reads: ShardedStateCache, + /// Optional StateCheckpoint payload pub block_end_info: Option, /// Optional EpochState payload. diff --git a/execution/executor-types/src/state_checkpoint_output.rs b/execution/executor-types/src/state_checkpoint_output.rs index 61f5bb597a8d2..85640934b0520 100644 --- a/execution/executor-types/src/state_checkpoint_output.rs +++ b/execution/executor-types/src/state_checkpoint_output.rs @@ -7,6 +7,7 @@ use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; use aptos_storage_interface::state_store::{ sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta, + state_summary::StateSummary, }; use derive_more::Deref; use std::sync::Arc; @@ -32,17 +33,20 @@ impl StateCheckpointOutput { }) } - pub fn new_empty(state: Arc) -> Self { + pub fn new_empty(_state_summary: StateSummary) -> Self { + todo!() + /* FIXME(aldenhu) Self::new_impl(Inner { parent_state: state.clone(), result_state: state, state_updates_before_last_checkpoint: None, state_checkpoint_hashes: vec![], }) + */ } pub fn new_dummy() -> Self { - Self::new_empty(Arc::new(StateDelta::new_empty())) + Self::new_empty(StateSummary::new_empty()) } fn new_impl(inner: Inner) -> Self { @@ -52,7 +56,10 @@ impl StateCheckpointOutput { } pub fn reconfig_suffix(&self) -> Self { + /* FIXME(aldenhu) Self::new_empty(self.result_state.clone()) + */ + todo!() } } diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 5b350ccbfbe6e..b94989a39ef9d 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -156,6 +156,9 @@ impl StateComputeResult { } pub fn as_chunk_to_commit(&self) -> ChunkToCommit { + todo!() + + /* FIXME(aldenhu): sharded_state_cache ChunkToCommit { first_version: self.ledger_update_output.first_version(), transactions: &self.execution_output.to_commit.transactions, @@ -168,8 +171,9 @@ impl StateComputeResult { .state_checkpoint_output .state_updates_before_last_checkpoint .as_ref(), - sharded_state_cache: Some(&self.execution_output.state_cache.sharded_state_cache), + sharded_state_cache, is_reconfig: self.execution_output.next_epoch_state.is_some(), } + */ } } diff --git a/execution/executor/src/block_executor/block_tree/mod.rs b/execution/executor/src/block_executor/block_tree/mod.rs index 6c13788fd51ff..1dcbff414788a 100644 --- a/execution/executor/src/block_executor/block_tree/mod.rs +++ b/execution/executor/src/block_executor/block_tree/mod.rs @@ -222,10 +222,7 @@ impl BlockTree { ledger_info.consensus_block_id() }; - let output = PartialStateComputeResult::new_empty( - ledger_summary.state().clone(), - ledger_summary.txn_accumulator().clone(), - ); + let output = PartialStateComputeResult::new_empty(ledger_summary); block_lookup.fetch_or_add_block(id, output, None) } @@ -259,10 +256,12 @@ impl BlockTree { ); last_committed_block }; + /* FIXME(aldenhu) root.output .expect_result_state() .current .log_generation("block_tree_base"); + */ let old_root = std::mem::replace(&mut *self.root.lock(), root); // send old root to async task to drop it diff --git a/execution/executor/src/block_executor/block_tree/test.rs b/execution/executor/src/block_executor/block_tree/test.rs index 060cab8dd228e..0119ca4dcd8a7 100644 --- a/execution/executor/src/block_executor/block_tree/test.rs +++ b/execution/executor/src/block_executor/block_tree/test.rs @@ -8,7 +8,6 @@ use crate::{ }; use aptos_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue}; use aptos_infallible::Mutex; -use aptos_storage_interface::LedgerSummary; use aptos_types::{block_info::BlockInfo, epoch_state::EpochState, ledger_info::LedgerInfo}; use std::sync::Arc; @@ -39,11 +38,15 @@ fn id(index: u64) -> HashValue { } fn empty_block() -> PartialStateComputeResult { + todo!() + /* FIXME(aldenhu) let result_view = LedgerSummary::new_empty(); PartialStateComputeResult::new_empty( - result_view.state().clone(), + result_view.state.clone(), result_view.transaction_accumulator.clone(), ) + + */ } fn gen_ledger_info(block_id: HashValue, reconfig: bool) -> LedgerInfo { diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 40c90751bcec7..f3dd14a685787 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -27,10 +27,7 @@ use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_metrics_core::{IntGaugeHelper, TimerHelper}; use aptos_storage_interface::{ - state_store::state_view::{ - async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, - }, - DbReaderWriter, + state_store::state_view::cached_state_view::CachedStateView, DbReaderWriter, }; use aptos_types::{ block_executor::{ @@ -224,9 +221,7 @@ where CachedStateView::new( StateViewId::BlockExecution { block_id }, Arc::clone(&self.db.reader), - parent_output.execution_output.next_version(), - parent_output.expect_result_state().current.clone(), - Arc::new(AsyncProofFetcher::new(self.db.reader.clone())), + parent_output.execution_output.result_state.clone(), )? }; @@ -255,7 +250,7 @@ where }); DoStateCheckpoint::run( &execution_output, - parent_output.expect_result_state(), + parent_output.expect_result_state_summary().clone(), Option::>::None, ) })?; diff --git a/execution/executor/src/chunk_executor/chunk_commit_queue.rs b/execution/executor/src/chunk_executor/chunk_commit_queue.rs index 14586ce250721..7f89702d77180 100644 --- a/execution/executor/src/chunk_executor/chunk_commit_queue.rs +++ b/execution/executor/src/chunk_executor/chunk_commit_queue.rs @@ -11,7 +11,10 @@ use crate::{ }, }; use anyhow::{anyhow, ensure, Result}; -use aptos_storage_interface::{state_store::state_delta::StateDelta, DbReader, LedgerSummary}; +use aptos_storage_interface::{ + state_store::{state_delta::StateDelta, state_summary::StateSummary}, + DbReader, +}; use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version}; use std::{collections::VecDeque, sync::Arc}; @@ -40,35 +43,48 @@ pub struct ChunkCommitQueue { } impl ChunkCommitQueue { - pub(crate) fn new_from_db(db: &Arc) -> Result { + pub(crate) fn new_from_db(_db: &Arc) -> Result { + todo!() + /* FIXME(aldenhu) let LedgerSummary { state, + state_summary, transaction_accumulator, } = db.get_pre_committed_ledger_summary()?; + Ok(Self { latest_state: state, latest_txn_accumulator: transaction_accumulator, to_commit: VecDeque::new(), to_update_ledger: VecDeque::new(), }) + */ } pub(crate) fn latest_state(&self) -> Arc { self.latest_state.clone() } + pub(crate) fn latest_state_summary(&self) -> StateSummary { + // FIXME(aldenhu) + todo!() + } + pub(crate) fn expecting_version(&self) -> Version { self.latest_state.next_version() } pub(crate) fn enqueue_for_ledger_update( &mut self, - chunk_to_update_ledger: ChunkToUpdateLedger, + _chunk_to_update_ledger: ChunkToUpdateLedger, ) -> Result<()> { + /* FIXME(aldenhu) self.latest_state = chunk_to_update_ledger.output.expect_result_state().clone(); self.to_update_ledger .push_back(Some(chunk_to_update_ledger)); Ok(()) + */ + todo!() } pub(crate) fn next_chunk_to_update_ledger( diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index 290e630757cf8..92f8e84eb036c 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -24,10 +24,7 @@ use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_metrics_core::{IntGaugeHelper, TimerHelper}; use aptos_storage_interface::{ - state_store::{ - state_delta::StateDelta, - state_view::{async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView}, - }, + state_store::{state_delta::StateDelta, state_view::cached_state_view::CachedStateView}, DbReaderWriter, }; use aptos_types::{ @@ -244,13 +241,12 @@ impl ChunkExecutorInner { } fn latest_state_view(&self, latest_state: &StateDelta) -> Result { + // FIXME(aldenhu): check let first_version = latest_state.next_version(); Ok(CachedStateView::new( StateViewId::ChunkExecution { first_version }, self.db.reader.clone(), - first_version, latest_state.current.clone(), - Arc::new(AsyncProofFetcher::new(self.db.reader.clone())), )?) } @@ -312,7 +308,7 @@ impl ChunkExecutorInner { // Calculate state snapshot let state_checkpoint_output = DoStateCheckpoint::run( &execution_output, - &self.commit_queue.lock().latest_state(), + self.commit_queue.lock().latest_state_summary(), Some( chunk_verifier .transaction_infos() diff --git a/execution/executor/src/db_bootstrapper/mod.rs b/execution/executor/src/db_bootstrapper/mod.rs index 577d9b5997f5b..19d7782b14cf7 100644 --- a/execution/executor/src/db_bootstrapper/mod.rs +++ b/execution/executor/src/db_bootstrapper/mod.rs @@ -152,7 +152,7 @@ pub fn calculate_genesis( "Genesis txn didn't output reconfig event." ); - let output = ApplyExecutionOutput::run(execution_output, &ledger_summary)?; + let output = ApplyExecutionOutput::run(execution_output, ledger_summary)?; let timestamp_usecs = if genesis_version == 0 { // TODO(aldenhu): fix existing tests before using real timestamp and check on-chain epoch. GENESIS_TIMESTAMP_USECS @@ -160,9 +160,7 @@ pub fn calculate_genesis( let state_view = CachedStateView::new( StateViewId::Miscellaneous, Arc::clone(&db.reader), - output.execution_output.next_version(), - output.expect_result_state().current.clone(), - Arc::new(AsyncProofFetcher::new(db.reader.clone())), + output.execution_output.result_state.clone(), )?; let next_epoch = epoch .checked_add(1) diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index d27c2fed3629d..8c331115f2749 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -496,7 +496,7 @@ fn apply_transaction_by_writeset( let chunk_output = DoGetExecutionOutput::by_transaction_output(txns, txn_outs, state_view).unwrap(); - let output = ApplyExecutionOutput::run(chunk_output, &ledger_summary).unwrap(); + let output = ApplyExecutionOutput::run(chunk_output, ledger_summary).unwrap(); db.writer .save_transactions( @@ -696,7 +696,7 @@ fn run_transactions_naive( TransactionSliceMetadata::unknown(), ) .unwrap(); - let output = ApplyExecutionOutput::run(out, &ledger_summary).unwrap(); + let output = ApplyExecutionOutput::run(out, ledger_summary).unwrap(); db.writer .save_transactions( output.expect_complete_result().as_chunk_to_commit(), diff --git a/execution/executor/src/types/in_memory_state_calculator_v2.rs b/execution/executor/src/types/in_memory_state_calculator_v2.rs index f8907d9c97e65..7f67ccbdd51e4 100644 --- a/execution/executor/src/types/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/types/in_memory_state_calculator_v2.rs @@ -1,16 +1,14 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::metrics::OTHER_TIMERS; -use anyhow::{ensure, Result}; -use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_drop_helper::DropHelper; +#![allow(dead_code)] + +use anyhow::Result; +use aptos_crypto::HashValue; use aptos_executor_types::{ execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, transactions_with_output::TransactionsWithOutput, ProofReader, }; -use aptos_logger::info; -use aptos_metrics_core::TimerHelper; use aptos_scratchpad::FrozenSparseMerkleTree; use aptos_storage_interface::state_store::{ sharded_state_update_refs::ShardedStateUpdateRefs, @@ -24,19 +22,19 @@ use aptos_types::{ }, write_set::WriteSet, }; -use itertools::{zip_eq, Itertools}; -use rayon::prelude::*; -use std::{ops::Deref, sync::Arc}; +use std::sync::Arc; /// Helper class for calculating state changes after a block of transactions are executed. pub struct InMemoryStateCalculatorV2 {} impl InMemoryStateCalculatorV2 { pub fn calculate_for_transactions( - execution_output: &ExecutionOutput, - parent_state: &Arc, - known_state_checkpoints: Option>>, + _execution_output: &ExecutionOutput, + _parent_state: &Arc, + _known_state_checkpoints: Option>>, ) -> Result { + todo!() + /* if execution_output.is_block { Self::validate_input_for_block(parent_state, &execution_output.to_commit)?; } @@ -49,6 +47,7 @@ impl InMemoryStateCalculatorV2 { execution_output.is_block, known_state_checkpoints, ) + */ } pub fn calculate_for_write_sets_after_snapshot( @@ -71,13 +70,14 @@ impl InMemoryStateCalculatorV2 { } fn calculate_impl( - parent_state: &Arc, - state_cache: &StateCache, - state_update_refs: &ShardedStateUpdateRefs, - last_checkpoint_index: Option, - is_block: bool, - known_state_checkpoints: Option>>, + _parent_state: &Arc, + _state_cache: &StateCache, + _state_update_refs: &ShardedStateUpdateRefs, + _last_checkpoint_index: Option, + _is_block: bool, + _known_state_checkpoints: Option>>, ) -> Result { + /* let StateCache { // This makes sure all in-mem nodes seen while proofs were fetched stays in mem during the // calculation @@ -206,12 +206,16 @@ impl InMemoryStateCalculatorV2 { last_checkpoint_index.map(|_| updates_before_last_checkpoint), state_checkpoint_hashes, )) + FIXME(aldenhu) + */ + todo!() } fn calculate_updates( - state_update_refs: &ShardedStateUpdateRefs, - last_checkpoint_index: Option, + _state_update_refs: &ShardedStateUpdateRefs, + _last_checkpoint_index: Option, ) -> (ShardedStateUpdates, ShardedStateUpdates) { + /* let _timer = OTHER_TIMERS.timer_with(&["calculate_updates"]); let mut shard_iters = state_update_refs @@ -248,6 +252,9 @@ impl InMemoryStateCalculatorV2 { } (before_last_checkpoint, after_last_checkpoint) + + */ + todo!() } fn add_to_delta( @@ -276,10 +283,11 @@ impl InMemoryStateCalculatorV2 { } fn calculate_usage( - old_usage: StateStorageUsage, - sharded_state_cache: &ShardedStateCache, - updates: &ShardedStateUpdates, + _old_usage: StateStorageUsage, + _sharded_state_cache: &ShardedStateCache, + _updates: &ShardedStateUpdates, ) -> StateStorageUsage { + /* let _timer = OTHER_TIMERS.timer_with(&["calculate_usage"]); if old_usage.is_untracked() { @@ -310,14 +318,18 @@ impl InMemoryStateCalculatorV2 { (old_usage.items() as i64 + items_delta) as usize, (old_usage.bytes() as i64 + bytes_delta) as usize, ) + + */ + todo!() } fn make_checkpoint( - latest_checkpoint: FrozenSparseMerkleTree, - updates: &ShardedStateUpdates, - usage: StateStorageUsage, - proof_reader: &ProofReader, + _latest_checkpoint: FrozenSparseMerkleTree, + _updates: &ShardedStateUpdates, + _usage: StateStorageUsage, + _proof_reader: &ProofReader, ) -> Result> { + /* let _timer = OTHER_TIMERS.timer_with(&["make_checkpoint"]); // Update SMT. @@ -337,12 +349,18 @@ impl InMemoryStateCalculatorV2 { latest_checkpoint.batch_update(smt_updates, usage, proof_reader)? }; Ok(new_checkpoint) + + */ + todo!() } fn validate_input_for_block( - base: &StateDelta, - to_commit: &TransactionsWithOutput, + _base: &StateDelta, + _to_commit: &TransactionsWithOutput, ) -> Result<()> { + // FIXME(aldenhu): check equivalent is done + todo!() + /* let num_txns = to_commit.len(); ensure!(num_txns != 0, "Empty block is not allowed."); ensure!( @@ -357,5 +375,7 @@ impl InMemoryStateCalculatorV2 { ); Ok(()) + + */ } } diff --git a/execution/executor/src/types/partial_state_compute_result.rs b/execution/executor/src/types/partial_state_compute_result.rs index 2a62efdab6d0c..7eb2de5734593 100644 --- a/execution/executor/src/types/partial_state_compute_result.rs +++ b/execution/executor/src/types/partial_state_compute_result.rs @@ -7,10 +7,11 @@ use aptos_executor_types::{ execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult, LedgerUpdateOutput, }; -use aptos_storage_interface::state_store::state_delta::StateDelta; -use aptos_types::proof::accumulator::InMemoryTransactionAccumulator; +use aptos_storage_interface::{ + state_store::{state::State, state_summary::StateSummary}, + LedgerSummary, +}; use once_cell::sync::OnceCell; -use std::sync::Arc; #[derive(Clone, Debug)] pub struct PartialStateComputeResult { @@ -28,18 +29,21 @@ impl PartialStateComputeResult { } } - pub fn new_empty( - state: Arc, - txn_accumulator: Arc, - ) -> Self { - let execution_output = ExecutionOutput::new_empty(state.clone()); + pub fn new_empty(ledger_summary: LedgerSummary) -> Self { + // Deliberately not reusing Self::new() here to make sure we don't leave + // any OnceCell unset. + let execution_output = ExecutionOutput::new_empty(ledger_summary.state); let ledger_update_output = OnceCell::new(); ledger_update_output - .set(LedgerUpdateOutput::new_empty(txn_accumulator)) + .set(LedgerUpdateOutput::new_empty( + ledger_summary.transaction_accumulator, + )) .expect("First set."); let state_checkpoint_output = OnceCell::new(); state_checkpoint_output - .set(StateCheckpointOutput::new_empty(state)) + .set(StateCheckpointOutput::new_empty( + ledger_summary.state_summary, + )) .expect("First set."); Self { @@ -63,8 +67,14 @@ impl PartialStateComputeResult { .expect("StateCheckpointOutput not set") } - pub fn expect_result_state(&self) -> &Arc { - &self.expect_state_checkpoint_output().result_state + pub fn expect_result_state(&self) -> &State { + &self.execution_output.result_state + } + + pub fn expect_result_state_summary(&self) -> &StateSummary { + // FIXME(aldenhu): + // &self.expect_state_checkpoint_output().result_state_summary + todo!() } pub fn set_state_checkpoint_output(&self, state_checkpoint_output: StateCheckpointOutput) { @@ -93,6 +103,7 @@ impl PartialStateComputeResult { self.ledger_update_output.get().map(|ledger_update_output| { StateComputeResult::new( self.execution_output.clone(), + // ledger_update_output is set in a later stage, so it's safe to `expect` here. self.expect_state_checkpoint_output().clone(), ledger_update_output.clone(), ) diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 99be96683f624..896d4238f544a 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -23,8 +23,9 @@ use aptos_executor_types::{ use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; -use aptos_storage_interface::state_store::state_view::cached_state_view::{ - CachedStateView, StateCache, +use aptos_storage_interface::state_store::{ + state::State, + state_view::cached_state_view::{CachedStateView, ShardedStateCache}, }; #[cfg(feature = "consensus-only-perf-test")] use aptos_types::transaction::ExecutionStatus; @@ -121,7 +122,7 @@ impl DoGetExecutionOutput { .map(|t| t.into_inner()) .collect(), transaction_outputs, - state_view.into_state_cache(), + state_view, block_end_info, append_state_checkpoint_to_block, ) @@ -152,7 +153,7 @@ impl DoGetExecutionOutput { .map(|t| t.into_txn().into_inner()) .collect(), transaction_outputs, - state_view.into_state_cache(), + state_view, None, // block end info append_state_checkpoint_to_block, ) @@ -176,7 +177,7 @@ impl DoGetExecutionOutput { state_view.next_version(), transactions, transaction_outputs, - state_view.into_state_cache(), + state_view, None, // block end info None, // append state checkpoint to block )?; @@ -287,7 +288,7 @@ impl Parser { first_version: Version, mut transactions: Vec, mut transaction_outputs: Vec, - state_cache: StateCache, + base_state_view: CachedStateView, block_end_info: Option, append_state_checkpoint_to_block: Option, ) -> Result { @@ -333,6 +334,10 @@ impl Parser { .transpose()? }; + let (base_state, state_reads) = base_state_view.finish(); + let (last_checkpoint_state, result_state) = + Self::update_state(&to_commit, &base_state, &state_reads); + let out = ExecutionOutput::new( is_block, first_version, @@ -340,7 +345,9 @@ impl Parser { to_commit, to_discard, to_retry, - state_cache, + last_checkpoint_state, + result_state, + state_reads, block_end_info, next_epoch_state, Planned::place_holder(), @@ -476,6 +483,15 @@ impl Parser { (&validator_set).into(), )) } + + fn update_state( + _to_commit: &TransactionsToKeep, + _base_state: &State, + _state_cache: &ShardedStateCache, + ) -> (Option, State) { + // FIXME(aldenhu): + todo!() + } } struct WriteSetStateView<'a> { @@ -502,7 +518,7 @@ impl<'a> TStateView for WriteSetStateView<'a> { #[cfg(test)] mod tests { use super::Parser; - use aptos_storage_interface::state_store::state_view::cached_state_view::StateCache; + use aptos_storage_interface::state_store::state_view::cached_state_view::CachedStateView; use aptos_types::{ contract_event::ContractEvent, transaction::{ @@ -541,7 +557,7 @@ mod tests { ), ]; let execution_output = - Parser::parse(0, txns, txn_outs, StateCache::new_dummy(), None, None).unwrap(); + Parser::parse(0, txns, txn_outs, CachedStateView::new_dummy(), None, None).unwrap(); assert_eq!( vec![event_0, event_2], *execution_output.subscribable_events diff --git a/execution/executor/src/workflow/do_state_checkpoint.rs b/execution/executor/src/workflow/do_state_checkpoint.rs index 1d4cfae66ec3a..a17ccfbef86b1 100644 --- a/execution/executor/src/workflow/do_state_checkpoint.rs +++ b/execution/executor/src/workflow/do_state_checkpoint.rs @@ -1,28 +1,28 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::types::in_memory_state_calculator_v2::InMemoryStateCalculatorV2; use anyhow::Result; use aptos_crypto::HashValue; use aptos_executor_types::{ execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, }; -use aptos_storage_interface::state_store::state_delta::StateDelta; -use std::sync::Arc; +use aptos_storage_interface::state_store::state_summary::StateSummary; pub struct DoStateCheckpoint; impl DoStateCheckpoint { pub fn run( - execution_output: &ExecutionOutput, - parent_state: &Arc, - known_state_checkpoints: Option>>, + _execution_output: &ExecutionOutput, + _parent_state_summary: StateSummary, + _known_state_checkpoints: Option>>, ) -> Result { + /* FIXME(aldenhu): // Apply the write set, get the latest state. InMemoryStateCalculatorV2::calculate_for_transactions( execution_output, - parent_state, known_state_checkpoints, ) + */ + todo!() } } diff --git a/execution/executor/src/workflow/mod.rs b/execution/executor/src/workflow/mod.rs index 9ea524d1eb8b7..322359c17eded 100644 --- a/execution/executor/src/workflow/mod.rs +++ b/execution/executor/src/workflow/mod.rs @@ -20,17 +20,17 @@ pub struct ApplyExecutionOutput; impl ApplyExecutionOutput { pub fn run( execution_output: ExecutionOutput, - base_view: &LedgerSummary, + base_view: LedgerSummary, ) -> Result { let state_checkpoint_output = DoStateCheckpoint::run( &execution_output, - base_view.state(), + base_view.state_summary, Option::>::None, // known_state_checkpoint_hashes )?; let ledger_update_output = DoLedgerUpdate::run( &execution_output, &state_checkpoint_output, - base_view.txn_accumulator().clone(), + base_view.transaction_accumulator, )?; let output = PartialStateComputeResult::new(execution_output); output.set_state_checkpoint_output(state_checkpoint_output); diff --git a/storage/aptosdb/src/db/aptosdb_test.rs b/storage/aptosdb/src/db/aptosdb_test.rs index 68d32147edd3a..d74b9f8acd4f6 100644 --- a/storage/aptosdb/src/db/aptosdb_test.rs +++ b/storage/aptosdb/src/db/aptosdb_test.rs @@ -176,7 +176,7 @@ fn test_get_transaction_auxiliary_data() { } #[test] -fn test_get_latest_executed_trees() { +fn test_get_latest_ledger_summary() { let tmp_dir = TempPath::new(); let db = AptosDB::new_for_test(&tmp_dir); diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index f4cd864c909cf..857d04a9dbeba 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -63,7 +63,10 @@ impl DbReader for AptosDB { fn get_pre_committed_version(&self) -> Result> { gauged_api("get_pre_committed_version", || { - Ok(self.state_store.current_state().lock().current_version) + todo!() + + // FIXME(aldenhu) + // Ok(self.state_store.current_state().lock().current_version) }) } @@ -529,6 +532,9 @@ impl DbReader for AptosDB { fn get_pre_committed_ledger_summary(&self) -> Result { gauged_api("get_pre_committed_ledger_summary", || { + todo!() + /* + FIXME(aldenhu) let current_state = self.state_store.current_state_cloned(); let num_txns = current_state.next_version(); @@ -543,6 +549,7 @@ impl DbReader for AptosDB { transaction_accumulator, ); Ok(ledger_summary) + */ }) } @@ -638,12 +645,16 @@ impl DbReader for AptosDB { fn get_latest_state_checkpoint_version(&self) -> Result> { gauged_api("get_latest_state_checkpoint_version", || { + todo!() + /* Ok(self .state_store .current_state() .lock() .base_version ) + FIXME(aldenhu) + */ }) } diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index 77288df003b01..daa400aa68e99 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 + use aptos_config::config::{BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD}; use std::default::Default; use aptos_storage_interface::state_store::state_view::cached_state_view::ShardedStateCache; @@ -203,11 +204,12 @@ impl ChunkToCommitOwned { } pub fn gather_state_updates_until_last_checkpoint( - first_version: Version, - latest_in_memory_state: &StateDelta, - state_update_refs: &ShardedStateUpdateRefs, - transaction_infos: &[TransactionInfo], + _first_version: Version, + _latest_in_memory_state: &StateDelta, + _state_update_refs: &ShardedStateUpdateRefs, + _transaction_infos: &[TransactionInfo], ) -> Option { + /* if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { if latest_checkpoint_version >= first_version { let idx = (latest_checkpoint_version - first_version) as usize; @@ -236,5 +238,7 @@ impl ChunkToCommitOwned { } None + */ + todo!() } } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 19212ab188798..13f654cc420a1 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -21,7 +21,8 @@ impl DbWriter for AptosDB { .expect("Concurrent committing detected."); let _timer = OTHER_TIMERS_SECONDS.timer_with(&["pre_commit_ledger"]); - chunk.latest_in_memory_state.current.log_generation("db_save"); + // FIXME(aldenhu) + // chunk.latest_in_memory_state.current.log_generation("db_save"); self.pre_commit_validation(&chunk)?; let _new_root_hash = self.calculate_and_commit_ledger_and_state_kv( @@ -230,16 +231,18 @@ impl AptosDB { !chunk.is_empty(), "chunk is empty, nothing to save.", ); + // FIXME(aldenhu): examine the chekc and the message ensure!( - Some(chunk.expect_last_version()) == chunk.latest_in_memory_state.current_version, + chunk.next_version() == chunk.latest_in_memory_state.next_version(), "the last_version {:?} to commit doesn't match the current_version {:?} in latest_in_memory_state", - chunk.expect_last_version(), - chunk.latest_in_memory_state.current_version.expect("Must exist"), + chunk.next_version(), + chunk.latest_in_memory_state.current.next_version(), ); { let current_state_guard = self.state_store.current_state(); let current_state = current_state_guard.lock(); + /* FIXME(aldenhu) ensure!( chunk.base_state_version == current_state.base_version, "base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}", @@ -247,6 +250,7 @@ impl AptosDB { current_state.base_version, current_state.current_version, ); + */ // Ensure the incoming committing requests are always consecutive and the version in // buffered state is consistent with that in db. @@ -555,7 +559,7 @@ impl AptosDB { version_to_commit: Version, ) -> Result> { let old_committed_ver = self.ledger_db.metadata_db().get_synced_version()?; - let pre_committed_ver = self.state_store.current_state().lock().current_version; + let pre_committed_ver = self.state_store.current_state().version(); ensure!( old_committed_ver.is_none() || version_to_commit >= old_committed_ver.unwrap(), "Version too old to commit. Committed: {:?}; Trying to commit with LI: {}", diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 6de6bce71bb76..32de8781d994f 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -3,6 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 //! This module provides reusable helpers in tests. + +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + #[cfg(test)] use crate::state_store::StateStore; #[cfg(test)] @@ -113,6 +119,7 @@ pub(crate) fn update_store( } pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[TransactionToCommit]) { + /* FIXME(aldenhu): let mut next_version = state.current_version.map_or(0, |v| v + 1); for txn_to_commit in txns_to_commit { txn_to_commit @@ -165,6 +172,8 @@ pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[Transact .unwrap(); state.current_version = next_version.checked_sub(1); } + */ + todo!() } prop_compose! { @@ -192,7 +201,9 @@ prop_compose! { for block_gen in block_gens { let (mut txns_to_commit, mut ledger_info) = block_gen.materialize(&mut universe); update_in_memory_state(&mut in_memory_state, &txns_to_commit); - let state_checkpoint_root_hash = in_memory_state.root_hash(); + // FIXME(aldenhu): + // let state_checkpoint_root_hash = in_memory_state.root_hash(); + let state_checkpoint_root_hash = aptos_crypto::HashValue::zero(); // make real txn_info's for txn in txns_to_commit.iter_mut() { @@ -915,6 +926,7 @@ pub(crate) fn put_transaction_auxiliary_data( } pub fn put_as_state_root(db: &AptosDB, version: Version, key: StateKey, value: StateValue) { + /* let leaf_node = Node::new_leaf(key.hash(), value.hash(), (key.clone(), version)); db.state_merkle_db() .metadata_db() @@ -938,12 +950,16 @@ pub fn put_as_state_root(db: &AptosDB, version: Version, key: StateKey, value: S .lock() .update(None, &in_memory_state, true) .unwrap(); + FIXME(aldenhu): + */ + todo!() } pub fn test_sync_transactions_impl( input: Vec<(Vec, LedgerInfoWithSignatures)>, snapshot_size_threshold: usize, ) { + /* let tmp_dir = TempPath::new(); let db = AptosDB::new_for_test_with_buffered_state_target_items(&tmp_dir, snapshot_size_threshold); @@ -1015,4 +1031,7 @@ pub fn test_sync_transactions_impl( .flat_map(|(txns_to_commit, _)| txns_to_commit.iter()) .collect(), ); + FIXME(aldenhu): + */ + todo!() } diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index d137c1c607b9a..00de0d6164bc6 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -3,6 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 #![forbid(unsafe_code)] +// FIXME(aldenhu) +#![allow(dead_code)] //! This crate provides [`AptosDB`] which represents physical storage of the core Aptos data //! structures. diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index 89f1e305e6f51..8019fc63a7a2d 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -3,6 +3,11 @@ //! This file defines state store buffered state that has been committed. +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + use crate::{ metrics::{LATEST_CHECKPOINT_VERSION, OTHER_TIMERS_SECONDS}, state_store::{state_snapshot_committer::StateSnapshotCommitter, CurrentState, StateDb}, @@ -55,10 +60,11 @@ pub(crate) enum CommitMessage { impl BufferedState { pub(crate) fn new( - state_db: &Arc, - state_after_checkpoint: StateDelta, - target_items: usize, + _state_db: &Arc, + _state_after_checkpoint: StateDelta, + _target_items: usize, ) -> (Self, SmtAncestors, CurrentState) { + /* let (state_commit_sender, state_commit_receiver) = mpsc::sync_channel(ASYNC_COMMIT_CHANNEL_BUFFER_SIZE as usize); let arc_state_db = Arc::clone(state_db); @@ -87,12 +93,16 @@ impl BufferedState { }; myself.report_latest_committed_version(); (myself, smt_ancestors, current_state) + FIXME(aldenhu) + */ + todo!() } /// This method checks whether a commit is needed based on the target_items value and the number of items in state_until_checkpoint. /// If a commit is needed, it sends a CommitMessage::Data message to the StateSnapshotCommitter thread to commit the data. /// If sync_commit is true, it also sends a CommitMessage::Sync message to ensure that the commit is completed before returning. fn maybe_commit(&mut self, sync_commit: bool) { + /* if sync_commit { let (commit_sync_sender, commit_sync_receiver) = mpsc::channel(); if let Some(to_commit) = self.state_until_checkpoint.take().map(Arc::from) { @@ -135,6 +145,9 @@ impl BufferedState { .unwrap(); } } + FIXME(aldenhu) + */ + todo!() } pub(crate) fn sync_commit(&mut self) { @@ -142,12 +155,16 @@ impl BufferedState { } fn report_latest_committed_version(&self) { + /* LATEST_CHECKPOINT_VERSION.set( self.state_after_checkpoint .lock() .base_version .map_or(-1, |v| v as i64), ); + FIXME(aldenhu) + */ + todo!() } /// This method updates the buffered state with new data. @@ -157,6 +174,7 @@ impl BufferedState { new_state_after_checkpoint: &StateDelta, sync_commit: bool, ) -> Result<()> { + /* { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["update_current_state"]); let mut state_after_checkpoint = self.state_after_checkpoint.lock(); @@ -204,6 +222,9 @@ impl BufferedState { self.maybe_commit(sync_commit); self.report_latest_committed_version(); Ok(()) + FIXME(aldenhu) + */ + todo!() } } diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index fe46527fb8742..fbd49baadbcd4 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -4,6 +4,11 @@ //! This file defines state store APIs that are related account state Merkle tree. +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + use crate::{ ledger_db::LedgerDb, metrics::{OTHER_TIMERS_SECONDS, STATE_ITEMS, TOTAL_STATE_BYTES}, @@ -110,6 +115,15 @@ impl CurrentState { pub fn new(from_latest_checkpoint_to_current: StateDelta) -> Self { Self(Arc::new(Mutex::new(from_latest_checkpoint_to_current))) } + + pub fn next_version(&self) -> Version { + // FIXME(aldenhu) + todo!() + } + + pub fn version(&self) -> Option { + self.next_version().checked_sub(1) + } } pub(crate) struct StateDb { @@ -463,6 +477,7 @@ impl StateStore { state_merkle_db: Arc, state_kv_db: Arc, ) -> Result> { + /* use aptos_config::config::NO_OP_STORAGE_PRUNER_CONFIG; let state_merkle_pruner = StateMerklePrunerManager::new( @@ -492,6 +507,9 @@ impl StateStore { )?; let base_version = current_state.lock().base_version; Ok(base_version) + FIXME(aldenhu) + */ + todo!() } fn create_buffered_state_from_latest_snapshot( @@ -500,6 +518,7 @@ impl StateStore { hack_for_tests: bool, check_max_versions_after_snapshot: bool, ) -> Result<(BufferedState, SmtAncestors, CurrentState)> { + /* let num_transactions = state_db .ledger_db .metadata_db() @@ -621,6 +640,9 @@ impl StateStore { ); } Ok((buffered_state, smt_ancestors, current_state)) + FIXME(aldenhu) + */ + todo!() } pub fn reset(&self) { diff --git a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs index 85e5c4dcbfcf3..f49abc8924825 100644 --- a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs +++ b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs @@ -3,6 +3,11 @@ //! This file defines the state merkle snapshot committer running in background thread. +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + use crate::{ metrics::{LATEST_SNAPSHOT_VERSION, OTHER_TIMERS_SECONDS}, pruner::PrunerManager, @@ -47,6 +52,7 @@ impl StateMerkleBatchCommitter { } pub fn run(self) { + /* while let Ok(msg) = self.state_merkle_batch_receiver.recv() { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["batch_committer_work"]); match msg { @@ -109,9 +115,13 @@ impl StateMerkleBatchCommitter { } } trace!("State merkle batch committing thread exit.") + FIXME(aldenhu) + */ + todo!() } fn check_usage_consistency(&self, state_delta: &StateDelta) -> Result<()> { + /* let version = state_delta .current_version .ok_or_else(|| anyhow!("Committing without version."))?; @@ -143,5 +153,8 @@ impl StateMerkleBatchCommitter { } Ok(()) + FIXME(aldenhu) + */ + todo!() } } diff --git a/storage/aptosdb/src/state_store/state_snapshot_committer.rs b/storage/aptosdb/src/state_store/state_snapshot_committer.rs index b45c49fa784fd..d8f859b37d4d9 100644 --- a/storage/aptosdb/src/state_store/state_snapshot_committer.rs +++ b/storage/aptosdb/src/state_store/state_snapshot_committer.rs @@ -3,6 +3,11 @@ //! This file defines the state snapshot committer running in background thread within StateStore. +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + use crate::{ metrics::OTHER_TIMERS_SECONDS, state_store::{ @@ -75,6 +80,7 @@ impl StateSnapshotCommitter { } pub fn run(self) { + /* while let Ok(msg) = self.state_snapshot_commit_receiver.recv() { match msg { CommitMessage::Data(delta_to_commit) => { @@ -167,6 +173,9 @@ impl StateSnapshotCommitter { } trace!("State snapshot committing thread exit.") } + FIXME(aldenhu) + */ + todo!() } } diff --git a/storage/backup/backup-cli/src/lib.rs b/storage/backup/backup-cli/src/lib.rs index cd8011d46f4e5..a9926bedb1e4f 100644 --- a/storage/backup/backup-cli/src/lib.rs +++ b/storage/backup/backup-cli/src/lib.rs @@ -3,6 +3,9 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::arithmetic_side_effects)] +// FIXME(aldenhu) +#![allow(unused_variables)] +#![allow(unused_imports)] pub mod backup_types; pub mod coordinators; diff --git a/storage/backup/backup-cli/src/utils/test_utils.rs b/storage/backup/backup-cli/src/utils/test_utils.rs index a5a9a9102743f..dc9b6805330d5 100644 --- a/storage/backup/backup-cli/src/utils/test_utils.rs +++ b/storage/backup/backup-cli/src/utils/test_utils.rs @@ -33,13 +33,13 @@ pub fn tmp_db_with_random_content() -> ( Arc, Vec<(Vec, LedgerInfoWithSignatures)>, ) { + /* FIXME(aldenhu) let (tmpdir, db) = tmp_db_empty(); let mut cur_ver: Version = 0; let mut in_memory_state = db .get_pre_committed_ledger_summary() .unwrap() .state - .as_ref() .clone(); let _ancestor = in_memory_state.base.clone(); let blocks = ValueGenerator::new().generate(arb_blocks_to_commit()); @@ -58,6 +58,8 @@ pub fn tmp_db_with_random_content() -> ( } (tmpdir, db, blocks) + */ + todo!() } pub fn start_local_backup_service(db: Arc) -> (Runtime, u16) { diff --git a/storage/db-tool/src/bootstrap.rs b/storage/db-tool/src/bootstrap.rs index 9557ee076dceb..0cf63f1aabfdc 100644 --- a/storage/db-tool/src/bootstrap.rs +++ b/storage/db-tool/src/bootstrap.rs @@ -66,13 +66,13 @@ impl Command { .reader .get_pre_committed_ledger_summary() .with_context(|| format_err!("Failed to get latest tree state."))?; - println!("Db has {} transactions", ledger_summary.num_transactions()); + println!("Db has {} transactions", ledger_summary.next_version()); if let Some(waypoint) = self.waypoint_to_verify { ensure!( - waypoint.version() == ledger_summary.num_transactions(), + waypoint.version() == ledger_summary.next_version(), "Trying to generate waypoint at version {}, but DB has {} transactions.", waypoint.version(), - ledger_summary.num_transactions(), + ledger_summary.next_version(), ) } diff --git a/storage/storage-interface/src/ledger_summary.rs b/storage/storage-interface/src/ledger_summary.rs index db0daa5a848fe..b0f3d7adad5e7 100644 --- a/storage/storage-interface/src/ledger_summary.rs +++ b/storage/storage-interface/src/ledger_summary.rs @@ -3,7 +3,8 @@ use crate::{ state_store::{ - state_delta::StateDelta, + state::State, + state_summary::StateSummary, state_view::{async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView}, }, DbReader, @@ -17,59 +18,44 @@ use aptos_types::{ }; use std::sync::Arc; -/// A wrapper of the in-memory state sparse merkle tree and the transaction accumulator that -/// represent a specific state collectively. Usually it is a state after executing a block. #[derive(Clone, Debug)] pub struct LedgerSummary { - /// The in-memory representation of state after execution. - pub state: Arc, - - /// The in-memory Merkle Accumulator representing a blockchain state consistent with the - /// `state_tree`. + pub state: State, + pub state_summary: StateSummary, pub transaction_accumulator: Arc, } impl LedgerSummary { - pub fn state(&self) -> &Arc { - &self.state - } - - pub fn txn_accumulator(&self) -> &Arc { - &self.transaction_accumulator - } - - pub fn version(&self) -> Option { - self.num_transactions().checked_sub(1) - } - - pub fn num_transactions(&self) -> u64 { - self.txn_accumulator().num_leaves() - } - - pub fn state_id(&self) -> HashValue { - self.txn_accumulator().root_hash() - } - pub fn new( - state: Arc, + state: State, + state_summary: StateSummary, transaction_accumulator: Arc, ) -> Self { - assert_eq!( - state.current_version.map_or(0, |v| v + 1), - transaction_accumulator.num_leaves() - ); + assert_eq!(state.next_version(), state_summary.next_version()); + assert_eq!(state.next_version(), transaction_accumulator.num_leaves()); Self { state, + state_summary, transaction_accumulator, } } + pub fn next_version(&self) -> Version { + self.state.next_version() + } + + pub fn version(&self) -> Option { + self.next_version().checked_sub(1) + } + pub fn new_at_state_checkpoint( - state_root_hash: HashValue, - state_usage: StateStorageUsage, - frozen_subtrees_in_accumulator: Vec, - num_leaves_in_accumulator: u64, + _state_root_hash: HashValue, + _state_usage: StateStorageUsage, + _frozen_subtrees_in_accumulator: Vec, + _num_leaves_in_accumulator: u64, ) -> Self { + todo!() + /* FIXME(aldenhu) let state = Arc::new(StateDelta::new_at_checkpoint( state_root_hash, state_usage, @@ -81,26 +67,34 @@ impl LedgerSummary { ); Self::new(state, transaction_accumulator) + */ } pub fn new_empty() -> Self { Self::new( - Arc::new(StateDelta::new_empty()), + State::new_empty(), + StateSummary::new_empty(), Arc::new(InMemoryAccumulator::new_empty()), ) } - pub fn is_same_view(&self, rhs: &Self) -> bool { - self.state().has_same_current_state(rhs.state()) + pub fn is_same_view(&self, _rhs: &Self) -> bool { + todo!() + /* FIXME(aldenhu) + self.state.has_same_current_state(rhs.state()) && self.transaction_accumulator.root_hash() == rhs.transaction_accumulator.root_hash() + + */ } pub fn verified_state_view( &self, - id: StateViewId, - reader: Arc, - proof_fetcher: Arc, + _id: StateViewId, + _reader: Arc, + _proof_fetcher: Arc, ) -> Result { + todo!() + /* FIXME(aldenhu) Ok(CachedStateView::new( id, reader, @@ -108,6 +102,7 @@ impl LedgerSummary { self.state.current.clone(), proof_fetcher, )?) + */ } } diff --git a/storage/storage-interface/src/state_store/sharded_state_updates.rs b/storage/storage-interface/src/state_store/sharded_state_updates.rs index 4abf2c5e9be9a..cd9b022ff4896 100644 --- a/storage/storage-interface/src/state_store/sharded_state_updates.rs +++ b/storage/storage-interface/src/state_store/sharded_state_updates.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::state_store::NUM_STATE_SHARDS; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_types::state_store::{state_key::StateKey, state_value::StateValue}; use arr_macro::arr; @@ -10,9 +11,11 @@ use rayon::iter::{ }; use std::collections::HashMap; +// FIXME(aldenhu): rename DeduppedStateWrites +// FIXME(aldenhu): change to [LayeredMap; 16] #[derive(Clone, Debug)] pub struct ShardedStateUpdates { - pub shards: [HashMap>; 16], + pub shards: [HashMap>; NUM_STATE_SHARDS], } impl ShardedStateUpdates { diff --git a/storage/storage-interface/src/state_store/state.rs b/storage/storage-interface/src/state_store/state.rs index ed2d99dc00c7b..4630692b8e340 100644 --- a/storage/storage-interface/src/state_store/state.rs +++ b/storage/storage-interface/src/state_store/state.rs @@ -26,8 +26,11 @@ pub struct State { impl State { pub fn new_empty() -> Self { - // FIXME(aldenhu): check call site and implement - todo!() + Self { + next_version: 0, + shards: Arc::new(arr_macro::arr![MapLayer::new_family("state"); 16]), + usage: StateStorageUsage::zero(), + } } pub fn next_version(&self) -> Version { @@ -43,7 +46,12 @@ impl State { } pub fn into_delta(self, _base: State) -> StateDelta { - // FIXME(aldnehu) + // FIXME(aldenhu) + todo!() + } + + pub fn is_the_same(&self, _rhs: &Self) -> bool { + // FIXME(aldenhu) todo!() } } diff --git a/storage/storage-interface/src/state_store/state_delta.rs b/storage/storage-interface/src/state_store/state_delta.rs index 85f2365c508f7..ebaf1b6ae3162 100644 --- a/storage/storage-interface/src/state_store/state_delta.rs +++ b/storage/storage-interface/src/state_store/state_delta.rs @@ -1,14 +1,12 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::state_store::{sharded_state_updates::ShardedStateUpdates, state_update::StateWrite}; +use crate::state_store::{ + sharded_state_updates::ShardedStateUpdates, state::State, state_update::StateWrite, +}; use aptos_crypto::HashValue; -use aptos_drop_helper::DropHelper; -use aptos_scratchpad::SparseMerkleTree; use aptos_types::{ - state_store::{ - state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, - }, + state_store::{state_key::StateKey, state_storage_usage::StateStorageUsage}, transaction::Version, }; @@ -21,21 +19,15 @@ use aptos_types::{ /// when the next checkpoint is calculated. #[derive(Clone, Debug)] pub struct StateDelta { - pub base: SparseMerkleTree, - pub base_version: Option, - pub current: SparseMerkleTree, - pub current_version: Option, - pub updates_since_base: DropHelper, + pub base: State, + pub current: State, + pub updates: ShardedStateUpdates, } impl StateDelta { - pub fn new( - base: SparseMerkleTree, - base_version: Option, - current: SparseMerkleTree, - current_version: Option, - updates_since_base: ShardedStateUpdates, - ) -> Self { + pub fn new(_base: State, _current: State) -> Self { + todo!() + /* FIXME(aldenhu): assert!(base.is_family(¤t)); assert!(base_version.map_or(0, |v| v + 1) <= current_version.map_or(0, |v| v + 1)); Self { @@ -45,9 +37,11 @@ impl StateDelta { current_version, updates_since_base: DropHelper::new(updates_since_base), } + */ } - pub fn new_empty_with_version(version: Option) -> StateDelta { + pub fn new_empty_with_version(_version: Option) -> StateDelta { + /* FIXME(aldenhu): let smt = SparseMerkleTree::new_empty(); Self::new( smt.clone(), @@ -56,6 +50,8 @@ impl StateDelta { version, ShardedStateUpdates::new_empty(), ) + */ + todo!() } pub fn new_empty() -> Self { @@ -63,10 +59,11 @@ impl StateDelta { } pub fn new_at_checkpoint( - root_hash: HashValue, - usage: StateStorageUsage, - checkpoint_version: Option, + _root_hash: HashValue, + _usage: StateStorageUsage, + _checkpoint_version: Option, ) -> Self { + /* FIXME(aldenhu): let smt = SparseMerkleTree::new(root_hash, usage); Self::new( smt.clone(), @@ -75,46 +72,45 @@ impl StateDelta { checkpoint_version, ShardedStateUpdates::new_empty(), ) + + */ + todo!() } - pub fn merge(&mut self, other: StateDelta) { + pub fn merge(&mut self, _other: StateDelta) { + /* FIXME(aldenhu): assert!(other.follow(self)); self.current = other.current; self.current_version = other.current_version; self.updates_since_base .merge(other.updates_since_base.into_inner()); + + */ + todo!() } - pub fn follow(&self, other: &StateDelta) -> bool { + pub fn follow(&self, _other: &StateDelta) -> bool { + /* FIXME(aldenhu): self.base_version == other.current_version && other.current.has_same_root_hash(&self.base) + */ + todo!() } - pub fn has_same_current_state(&self, other: &StateDelta) -> bool { + pub fn has_same_current_state(&self, _other: &StateDelta) -> bool { + /* FIXME(aldenhu): self.current_version == other.current_version && self.current.has_same_root_hash(&other.current) - } - - pub fn base_root_hash(&self) -> HashValue { - self.base.root_hash() - } - - pub fn root_hash(&self) -> HashValue { - self.current.root_hash() + */ + todo!() } pub fn next_version(&self) -> Version { - self.current_version.map_or(0, |v| v + 1) - } - - pub fn replace_with(&mut self, mut rhs: Self) -> Self { - std::mem::swap(self, &mut rhs); - rhs + self.current.next_version() } pub fn parent_version(&self) -> Option { - // FIXME(aldenhu): update - self.base_version + self.base.next_version().checked_sub(1) } /// Get the state update for a given state key. @@ -123,11 +119,6 @@ impl StateDelta { // FIXME(aldenhu) todo!() } - - pub fn usage(&self) -> StateStorageUsage { - // FIXME(aldenhu): - todo!() - } } impl Default for StateDelta { diff --git a/storage/storage-interface/src/state_store/state_summary.rs b/storage/storage-interface/src/state_store/state_summary.rs index 433e3c40b2afa..9660e17ba9399 100644 --- a/storage/storage-interface/src/state_store/state_summary.rs +++ b/storage/storage-interface/src/state_store/state_summary.rs @@ -8,6 +8,7 @@ use aptos_types::{state_store::state_value::StateValue, transaction::Version}; /// The data structure through which the entire state at a given /// version can be summarized to a concise digest (the root hash). +#[derive(Clone, Debug)] pub struct StateSummary { /// The next version. If this is 0, the state is the "pre-genesis" empty state. next_version: Version, @@ -22,6 +23,13 @@ impl StateSummary { } } + pub fn new_empty() -> Self { + Self { + next_version: 0, + global_state_summary: SparseMerkleTree::new_empty(), + } + } + pub fn update(&self, _persisted: &StateSummary, _state_delta: &StateDelta) -> Self { // FIXME(aldenhu) todo!() diff --git a/storage/storage-interface/src/state_store/state_view/cached_state_view.rs b/storage/storage-interface/src/state_store/state_view/cached_state_view.rs index 89a1b70ca33fd..1396794211b2b 100644 --- a/storage/storage-interface/src/state_store/state_view/cached_state_view.rs +++ b/storage/storage-interface/src/state_store/state_view/cached_state_view.rs @@ -4,9 +4,8 @@ use crate::{ metrics::TIMER, state_store::{ - state_delta::StateDelta, - state_update::StateValueWithVersionOpt, - state_view::{async_proof_fetcher::AsyncProofFetcher, db_state_view::DbStateView}, + state::State, state_delta::StateDelta, state_update::StateValueWithVersionOpt, + state_view::db_state_view::DbStateView, }, DbReader, }; @@ -30,7 +29,6 @@ use std::{ fmt::{Debug, Formatter}, sync::Arc, }; - /* FIXME(aldenhu): remove static IO_POOL: Lazy = Lazy::new(|| { rayon::ThreadPoolBuilder::new() @@ -72,8 +70,9 @@ pub struct CachedStateView { /// For logging and debugging purpose, identifies what this view is for. id: StateViewId, - /// The persisted state, readable from the persist storage - persisted: Arc, + /// The persisted state is readable from the persist storage, at the version of + /// `self.speculative.parent_version()` + reader: Arc, /// The in-memory state on top of known persisted state speculative: StateDelta, @@ -95,20 +94,27 @@ impl CachedStateView { pub fn new( _id: StateViewId, _reader: Arc, - _next_version: Version, - _speculative_state: SparseMerkleTree, - _proof_fetcher: Arc, + _state: State, ) -> StateViewResult { + // FIXME(aldnehu): get persisted state from db and call `new_impl()` todo!() } pub fn new_impl( - _id: StateViewId, - _next_version: Version, - _snapshot: Option<(Version, HashValue)>, - _speculative_state: FrozenSparseMerkleTree, - _proof_fetcher: Arc, + id: StateViewId, + reader: Arc, + persisted_state: State, + state: State, ) -> Self { + Self { + id, + reader, + speculative: StateDelta::new(persisted_state, state), + memorized: ShardedStateCache::default(), + } + } + + pub fn new_dummy() -> Self { // FIXME(aldenhu) todo!() } @@ -142,6 +148,21 @@ impl CachedStateView { todo!() } + /// Consumes `Self` and returns the state and all the memorized state reads. + pub fn finish(self) -> (State, ShardedStateCache) { + todo!() + /* FIXME(aldenhu) + let Self { + id: _, + persisted: _, + speculative, + memorized, + } = self; + + (speculative.current, memorized) + */ + } + fn parent_version(&self) -> Option { self.speculative.parent_version() } @@ -152,7 +173,7 @@ impl CachedStateView { update.to_state_value_with_version() } else if let Some(parent_version) = self.parent_version() { StateValueWithVersionOpt::from_tuple_opt( - self.persisted + self.reader .get_state_value_with_version_by_version(state_key, parent_version)?, ) } else { @@ -167,7 +188,7 @@ impl CachedStateView { } } -// FIXME(aldenhu): remove unnecessary fields +// FIXME(aldenhu): remove unnecessary fields, probably remove entirely and use ShardedStateCache directly #[derive(Debug)] pub struct StateCache { pub frozen_base: FrozenSparseMerkleTree, @@ -217,7 +238,7 @@ impl TStateView for CachedStateView { } fn get_usage(&self) -> StateViewResult { - Ok(self.speculative.usage()) + Ok(self.speculative.current.usage()) } }