Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fee catchup after restart #2160

Merged
merged 23 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
33acf0e
Add regression test for fee catchup after restart
jbearer Oct 4, 2024
57ad0ed
Add bulk endpoint for account catchup
jbearer Oct 4, 2024
4c98234
Enable account catchup from persisted decided storage
jbearer Oct 10, 2024
ffe5209
Cache fetched state in validated state map after fetching from database
jbearer Oct 10, 2024
1305906
Support migration from existing nodes populating leaf_hash column
jbearer Oct 10, 2024
ef41777
Update HotShot to enable caching of fetched account states
jbearer Oct 10, 2024
6cde9e7
Rename migration after merge
jbearer Oct 10, 2024
d56be55
Add proposal fetching task
jbearer Oct 11, 2024
cb7e122
Return leaf when fetching account state so we can always add it to th…
jbearer Oct 11, 2024
166b7ca
Fix deadlock in restart tests
jbearer Oct 11, 2024
bfae117
Update sequencer/api/catchup.toml
jbearer Oct 11, 2024
b0f03b9
Avoid dropping TaskList, to avoid blocking executor thread in drop ha…
jbearer Oct 11, 2024
2129da3
Exit proposal fetching task if it becomes obsolete
jbearer Oct 17, 2024
fb7d7fd
Enable frontier catchup from storage (#2183)
jbearer Oct 17, 2024
1b34d09
Remove no-longer-necessary catchup impl for DB transactions
jbearer Oct 17, 2024
8f20c43
Completely disable catchup when not required (NullStateCatchup)
jbearer Oct 17, 2024
6af0cb3
Merge remote-tracking branch 'origin/main' into jb/state-catchup-2
jbearer Oct 17, 2024
cc1fe38
Fix backoff CLI parsing
jbearer Oct 17, 2024
7e82fc8
Use the proper chain config during state reconstruction
jbearer Oct 18, 2024
a237fac
Add comment
jbearer Oct 18, 2024
e8b2767
Fix migration conflict
jbearer Oct 20, 2024
c6a5169
Make migrations consistent with release-gambit
jbearer Oct 20, 2024
586427b
Merge remote-tracking branch 'origin/main' into jb/state-catchup-2
jbearer Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions sequencer/api/catchup.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ the proof is a Merkle _non-membership_ proof.
```
"""

[route.accounts]
PATH = ["/:height/:view/accounts"]
":height" = "Integer"
":view" = "Integer"
METHOD = "POST"
DOC = """
Bulk version of `/:height/:view/account`. The request body should be a JSON array consisting of
TaggedBase64-encoded fee accounts.

The response is a `FeeMerkleTree` containing sub-trees for each of the requested accounts, which is
a more condensed way to represent the union of account proofs for each requested account. Individual
Merkle proofs for each account can be extracted from this tree.
"""

[route.blocks]
PATH = ["/:height/:view/blocks"]
":height" = "Integer"
Expand Down
2 changes: 2 additions & 0 deletions sequencer/api/migrations/V38__add_quorum_proposal_hash.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE quorum_proposals
ADD COLUMN leaf_hash VARCHAR;
165 changes: 131 additions & 34 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use anyhow::{bail, Context};
use async_once_cell::Lazy;
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use committable::Commitment;
use committable::{Commitment, Committable};
use data_source::{CatchupDataSource, SubmitDataSource};
use derivative::Derivative;
use espresso_types::{
v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData, BlockMerkleTree,
FeeAccountProof, MockSequencerVersions, NodeState, PubKey, Transaction,
retain_accounts, v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData,
BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleTree, MockSequencerVersions, NodeState,
PubKey, Transaction, ValidatedState,
};
use ethers::prelude::Address;
use futures::{
future::{BoxFuture, Future, FutureExt},
stream::BoxStream,
Expand All @@ -26,14 +26,17 @@ use hotshot_types::{
event::Event,
light_client::StateSignatureRequestBody,
network::NetworkConfig,
traits::{network::ConnectedNetwork, node_implementation::Versions},
traits::{network::ConnectedNetwork, node_implementation::Versions, ValidatedState as _},
utils::{View, ViewInner},
};
use jf_merkle_tree::MerkleTreeScheme;

use self::data_source::{HotShotConfigDataSource, PublicNetworkConfig, StateSignatureDataSource};
use self::data_source::{
HotShotConfigDataSource, NodeStateDataSource, PublicNetworkConfig, StateSignatureDataSource,
};
use crate::{
context::Consensus, network, state_signature::StateSigner, SeqTypes, SequencerApiVersion,
SequencerContext,
catchup::CatchupStorage, context::Consensus, network, state_signature::StateSigner, SeqTypes,
SequencerApiVersion, SequencerContext,
};

pub mod data_source;
Expand Down Expand Up @@ -105,10 +108,6 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState
Arc::clone(&self.consensus.as_ref().get().await.get_ref().handle)
}

async fn node_state(&self) -> &NodeState {
&self.consensus.as_ref().get().await.get_ref().node_state
}

async fn network_config(&self) -> NetworkConfig<PubKey> {
self.consensus.as_ref().get().await.get_ref().config.clone()
}
Expand Down Expand Up @@ -189,44 +188,128 @@ impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDa
}
}

impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
where
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
D: Sync,
{
async fn node_state(&self) -> &NodeState {
self.as_ref().node_state().await
}
}

impl<
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
D: CatchupDataSource + Send + Sync,
D: CatchupStorage + Send + Sync,
> CatchupDataSource for StorageState<N, P, D, V>
{
#[tracing::instrument(skip(self))]
async fn get_account(
#[tracing::instrument(skip(self, instance))]
async fn get_accounts(
&self,
instance: &NodeState,
height: u64,
view: ViewNumber,
account: Address,
) -> anyhow::Result<AccountQueryData> {
accounts: &[FeeAccount],
) -> anyhow::Result<FeeMerkleTree> {
// Check if we have the desired state in memory.
match self.as_ref().get_account(height, view, account).await {
Ok(account) => return Ok(account),
match self
.as_ref()
.get_accounts(instance, height, view, accounts)
.await
{
Ok(accounts) => return Ok(accounts),
Err(err) => {
tracing::info!("account is not in memory, trying storage: {err:#}");
tracing::info!("accounts not in memory, trying storage: {err:#}");
}
}

// Try storage.
self.inner().get_account(height, view, account).await
let (tree, leaf) = self
.inner()
.get_accounts(instance, height, view, accounts)
.await
.context("accounts not in memory, and could not fetch from storage")?;
// If we successfully fetched accounts from storage, try to add them back into the in-memory
// state.
let handle = self.as_ref().consensus().await;
let handle = handle.read().await;
let consensus = handle.consensus();
let mut consensus = consensus.write().await;
let (state, delta, leaf_commit) = match consensus.validated_state_map().get(&view) {
Some(View {
view_inner: ViewInner::Leaf { state, delta, leaf },
}) => {
let mut state = (**state).clone();

// Add the fetched accounts to the state.
for account in accounts {
if let Some((proof, _)) = FeeAccountProof::prove(&tree, (*account).into()) {
if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
tracing::warn!(
?view,
%account,
"cannot update fetched account state: {err:#}"
);
}
} else {
tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
};
}

(Arc::new(state), delta.clone(), *leaf)
}
_ => {
// If we don't already have a leaf for this view, or if we don't have the view
// at all, we can create a new view based on the recovered leaf and add it to
// our state map. In this case, we must also add the leaf to the saved leaves
// map to ensure consistency.
let mut state = ValidatedState::from_header(leaf.block_header());
state.fee_merkle_tree = tree.clone();
let res = (Arc::new(state), None, Committable::commit(&leaf));
consensus
.update_saved_leaves(leaf, &handle.hotshot.upgrade_lock)
.await;
res
}
};
if let Err(err) = consensus.update_validated_state_map(
view,
View {
view_inner: ViewInner::Leaf {
state,
delta,
leaf: leaf_commit,
},
},
) {
tracing::warn!(?view, "cannot update fetched account state: {err:#}");
}
tracing::info!(?view, "updated with fetched account state");

Ok(tree)
}

#[tracing::instrument(skip(self))]
async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result<BlocksFrontier> {
async fn get_frontier(
&self,
instance: &NodeState,
height: u64,
view: ViewNumber,
) -> anyhow::Result<BlocksFrontier> {
// Check if we have the desired state in memory.
match self.as_ref().get_frontier(height, view).await {
match self.as_ref().get_frontier(instance, height, view).await {
Ok(frontier) => return Ok(frontier),
Err(err) => {
tracing::info!("frontier is not in memory, trying storage: {err:#}");
}
}

// Try storage.
self.inner().get_frontier(height, view).await
self.inner().get_frontier(instance, height, view).await
}

async fn get_chain_config(
Expand Down Expand Up @@ -265,16 +348,28 @@ impl<
// }
// }

impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
where
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
{
async fn node_state(&self) -> &NodeState {
&self.consensus.as_ref().get().await.get_ref().node_state
}
}

impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
for ApiState<N, P, V>
{
#[tracing::instrument(skip(self))]
async fn get_account(
#[tracing::instrument(skip(self, _instance))]
async fn get_accounts(
&self,
_instance: &NodeState,
height: u64,
view: ViewNumber,
account: Address,
) -> anyhow::Result<AccountQueryData> {
accounts: &[FeeAccount],
) -> anyhow::Result<FeeMerkleTree> {
let state = self
.consensus()
.await
Expand All @@ -285,14 +380,16 @@ impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupD
.context(format!(
"state not available for height {height}, view {view:?}"
))?;
let (proof, balance) = FeeAccountProof::prove(&state.fee_merkle_tree, account).context(
format!("account {account} not available for height {height}, view {view:?}"),
)?;
Ok(AccountQueryData { balance, proof })
retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
}

#[tracing::instrument(skip(self))]
async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result<BlocksFrontier> {
async fn get_frontier(
&self,
_instance: &NodeState,
height: u64,
view: ViewNumber,
) -> anyhow::Result<BlocksFrontier> {
let state = self
.consensus()
.await
Expand Down Expand Up @@ -1925,7 +2022,7 @@ mod test {

// Fetch the config from node 1, a different node than the one running the service.
let validator = ValidatorConfig::generated_from_seed_indexed([0; 32], 1, 1, false);
let mut config = peers.fetch_config(validator.clone()).await;
let mut config = peers.fetch_config(validator.clone()).await.unwrap();

// Check the node-specific information in the recovered config is correct.
assert_eq!(config.node_index, 1);
Expand Down
Loading