Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce API for directly importing the state into the database #5956

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

liuchengxu
Copy link
Contributor

@liuchengxu liuchengxu commented Oct 7, 2024

This new API commits the state directly to the underlying database, reducing the risk of OOM issues when importing large states into the node. More background on this can be found at #5862

This PR is an internal implementation change, replacing the reset_storage() function with the new commit_trie_changes() API in Client. The entire state still stays in the memory, which will be worked on after merging this PR.

I’ve successfully tested fast sync locally, and it works as expected. However, the unit test I added for this API has been failing since 3372d31. I’m submitting this PR to get early feedback on the overall approach and insights into the test failure before diving deeper into troubleshooting.

cc @bkchr

Close #5862

@liuchengxu

This comment was marked as resolved.

Two fixes:
- Workaround for using the proper hash of block before importing the target block's state.
- Set `commit_state` flag along with the `commit_trie_changes()`.
@liuchengxu

This comment was marked as resolved.

@liuchengxu liuchengxu marked this pull request as ready for review November 22, 2024 13:04
@liuchengxu
Copy link
Contributor Author

@bkchr This PR is nearly complete, with only a few minor TODOs remaining. Could you please review it and provide any feedback? It’s now blocking further progress on state sync enhancement.

liuchengxu added a commit to liuchengxu/polkadot-sdk that referenced this pull request Nov 28, 2024
Comment on lines +2559 to +2560
storage: sp_runtime::Storage,
state_version: sp_runtime::StateVersion,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the issue. We need to forward here the PrefixedMemoryDb. This should be generated from the proofs and then we put these values directly into the STATE column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean MemoryDb?

fn verify_range_proof(
&self,
root: Block::Hash,
proof: CompactProof,
start_key: &[Vec<u8>],
) -> sp_blockchain::Result<(KeyValueStates, usize)> {
let mut db = sp_state_machine::MemoryDB::<HashingFor<Block>>::new(&[]);

From the compact proof in the state response, it seems we can derive the MemoryDb and consolidate all the individual MemoryDb instances from the responses to form the final state db. This could then be merged into the underlying database.

Is this the idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the compact proof can be converted to a MemoryDB, this db contains all the intermediate nodes. We can just add these nodes to the STATE column. This would be the "merging process".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do it this way, we don't need to keep all the instances in memory, we just apply them to the db as we have them downloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem arises: :code does not seem to exist in the storage after the state is imported in this way.

2024-11-30 14:33:09.388  INFO tokio-runtime-worker sync: State sync is complete (4 MiB), restarting block sync.    
2024-11-30 14:33:09.388 DEBUG tokio-runtime-worker sync: Starting from finalized state #24176    
2024-11-30 14:33:09.388 DEBUG tokio-runtime-worker sync: Restarted with 24176 (0xc0dc…4470)    
2024-11-30 14:33:09.388 DEBUG tokio-runtime-worker sync: New peer 12D3KooWP7P2zVLTt2Y6NuQ1bHovbCPzF62X9nqj1k2zbkXiHETp with known best hash 0x037c…5368 (24183).    
2024-11-30 14:33:09.389  WARN tokio-runtime-worker sc_service::client::client: Block prepare storage changes error: Error at calling runtime api: Runtime code error: `:code` hash not found    
2024-11-30 14:33:09.389 DEBUG tokio-runtime-worker sync::import-queue: Error importing block 24177: 0xabb726ac16dde078f49c269a2d9f72964d28c34a2cf044eead27a41aa1f9a605: Import failed: Error at calling runtime api: Runtime code error: `:code` hash not found    
2024-11-30 14:33:09.390  WARN tokio-runtime-worker sync: 💔 Error importing block 0xabb726ac16dde078f49c269a2d9f72964d28c34a2cf044eead27a41aa1f9a605: consensus error: Import failed: Error at calling runtime api: Runtime code error: `:code` hash not found    
2024-11-30 14:33:09.390 DEBUG tokio-runtime-worker sync: Starting from finalized state #24176    
2024-11-30 14:33:09.390 DEBUG tokio-runtime-worker sync: Restarted with 24176 (0xc0dc…4470)    
2024-11-30 14:33:09.390 DEBUG tokio-runtime-worker sync: New peer 12D3KooWP7P2zVLTt2Y6NuQ1bHovbCPzF62X9nqj1k2zbkXiHETp with known best hash 0x037c…5368 (24183).    
2024-11-30 14:33:09.390  WARN tokio-runtime-worker sc_service::client::client: Block prepare storage changes error: Error at calling runtime api: Runtime code error: `:code` hash not found    
2024-11-30 14:33:09.390 DEBUG tokio-runtime-worker sync::import-queue: Error importing block 24177: 0xabb726ac16dde078f49c269a2d9f72964d28c34a2cf044eead27a41aa1f9a605: Import failed: Error at calling runtime api: Runtime code error: `:code` hash not found    
2024-11-30 14:33:09.390  WARN tokio-runtime-worker sync: 💔 Error importing block 0xabb726ac16dde078f49c269a2d9f72964d28c34a2cf044eead27a41aa1f9a605: consensus error: Import failed: Error at calling runtime api: Runtime code error: `:code` hash not found    
diff --git a/Cargo.lock b/Cargo.lock
index 51da65ad5e..9841003c5a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -23031,6 +23031,7 @@ dependencies = [
  "sp-consensus-grandpa 13.0.0",
  "sp-core 28.0.0",
  "sp-runtime 31.0.1",
+ "sp-state-machine 0.35.0",
  "sp-test-primitives",
  "sp-tracing 16.0.0",
  "substrate-prometheus-endpoint",
diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs
index 06e5593242..f24d09b144 100644
--- a/substrate/client/api/src/backend.rs
+++ b/substrate/client/api/src/backend.rs
@@ -614,6 +614,8 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
                state_version: sp_runtime::StateVersion,
        ) -> sp_blockchain::Result<Block::Hash>;

+       fn import_state_db(&self, state_db: sp_state_machine::MemoryDB<HashingFor<Block>>);
+
        /// Attempts to revert the chain by `n` blocks. If `revert_finalized` is set it will attempt to
        /// revert past any finalized block, this is unsafe and can potentially leave the node in an
        /// inconsistent state. All blocks higher than the best block are also reverted and not counting
diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs
index 63954f0f40..ccebda0927 100644
--- a/substrate/client/api/src/in_mem.rs
+++ b/substrate/client/api/src/in_mem.rs
@@ -757,6 +757,10 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> {
                unimplemented!("Not needed for in-mem backend")
        }

+       fn import_state_db(&self, _state_db: sp_state_machine::MemoryDB<HashingFor<Block>>) {
+               unimplemented!("Not needed for in-mem backend")
+       }
+
        fn revert(
                &self,
                _n: NumberFor<Block>,
diff --git a/substrate/client/api/src/proof_provider.rs b/substrate/client/api/src/proof_provider.rs
index 7f60f856ae..5102cb39ff 100644
--- a/substrate/client/api/src/proof_provider.rs
+++ b/substrate/client/api/src/proof_provider.rs
@@ -18,7 +18,7 @@

 //! Proof utilities
 use crate::{CompactProof, StorageProof};
-use sp_runtime::traits::Block as BlockT;
+use sp_runtime::traits::{Block as BlockT, HashingFor};
 use sp_state_machine::{KeyValueStates, KeyValueStorageLevel};
 use sp_storage::ChildInfo;

@@ -89,5 +89,8 @@ pub trait ProofProvider<Block: BlockT> {
                root: Block::Hash,
                proof: CompactProof,
                start_keys: &[Vec<u8>],
-       ) -> sp_blockchain::Result<(KeyValueStates, usize)>;
+       ) -> sp_blockchain::Result<(
+               (KeyValueStates, usize),
+               sp_state_machine::MemoryDB<HashingFor<Block>>,
+       )>;
 }
diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs
index 0fcf96a963..4c1e048cbf 100644
--- a/substrate/client/consensus/common/src/block_import.rs
+++ b/substrate/client/consensus/common/src/block_import.rs
@@ -133,6 +133,7 @@ pub struct ImportedState<B: BlockT> {
        pub block: B::Hash,
        /// State keys and values.
        pub state: sp_state_machine::KeyValueStates,
+       pub state_db: sp_state_machine::MemoryDB<HashingFor<B>>,
 }

 impl<B: BlockT> std::fmt::Debug for ImportedState<B> {
diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs
index 8fe1a6a326..e16ae8404b 100644
--- a/substrate/client/db/src/lib.rs
+++ b/substrate/client/db/src/lib.rs
@@ -2676,6 +2676,37 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
                Ok(state_root)
        }

+       fn import_state_db(&self, mut state_db: sp_state_machine::MemoryDB<HashingFor<Block>>) {
+               use sp_database::{Change, Database, Transaction};
+
+               let mut inserts = vec![];
+               for (key, (val, rc)) in state_db.drain() {
+                       let mut key = key.as_ref().to_vec();
+                       self.storage.db.sanitize_key(&mut key);
+                       log::info!("============== key: {key:?}, value: {val:?}");
+                       assert!(rc > 0);
+                       if rc > 0 {
+                               if rc == 1 {
+                                       inserts.push((key, val.to_vec()));
+                               } else {
+                                       inserts.push((key.clone(), val.to_vec()));
+                                       for _ in 0..rc - 1 {
+                                               inserts.push((key.clone(), Default::default()));
+                                       }
+                               }
+                       }
+               }
+
+               let tx = Transaction(
+                       inserts
+                               .into_iter()
+                               .map(|(k, v)| Change::Set(columns::STATE, k, v))
+                               .collect::<Vec<_>>(),
+               );
+
+               self.storage.db.commit(tx).expect("Failed to commit state db");
+       }
+
        fn get_import_lock(&self) -> &RwLock<()> {
                &self.import_lock
        }
diff --git a/substrate/client/db/src/state_importer.rs b/substrate/client/db/src/state_importer.rs
index 7c78263d10..2c24c40489 100644
--- a/substrate/client/db/src/state_importer.rs
+++ b/substrate/client/db/src/state_importer.rs
@@ -94,6 +94,7 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: Hasher> hash_db::HashDB<H, DBValue>

        fn emplace(&mut self, key: H::Out, prefix: Prefix, value: DBValue) {
                let key = sp_trie::prefixed_key::<H>(&key, prefix);
+               log::info!("==================== [emplace] key: {key:?}, value: {value:?}");
                let tx = Transaction(vec![Change::Set(columns::STATE, key, value)]);
                // TODO: better error handling?
                self.trie_database
diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml
index 378b7c12e9..2d764962eb 100644
--- a/substrate/client/network/sync/Cargo.toml
+++ b/substrate/client/network/sync/Cargo.toml
@@ -47,6 +47,7 @@ sp-consensus = { workspace = true, default-features = true }
 sp-core = { workspace = true, default-features = true }
 sp-consensus-grandpa = { workspace = true, default-features = true }
 sp-runtime = { workspace = true, default-features = true }
+sp-state-machine = { workspace = true, default-features = true }

 [dev-dependencies]
 mockall = { workspace = true }
diff --git a/substrate/client/network/sync/src/strategy/state_sync.rs b/substrate/client/network/sync/src/strategy/state_sync.rs
index 47d859a1b7..db8cdf90fc 100644
--- a/substrate/client/network/sync/src/strategy/state_sync.rs
+++ b/substrate/client/network/sync/src/strategy/state_sync.rs
@@ -29,7 +29,7 @@ use sc_consensus::ImportedState;
 use smallvec::SmallVec;
 use sp_core::storage::well_known_keys;
 use sp_runtime::{
-       traits::{Block as BlockT, Header, NumberFor},
+       traits::{Block as BlockT, HashingFor, Header, NumberFor},
        Justifications,
 };
 use std::{collections::HashMap, fmt, sync::Arc};
@@ -142,6 +142,7 @@ impl<B: BlockT> StateSyncMetadata<B> {
 pub struct StateSync<B: BlockT, Client> {
        metadata: StateSyncMetadata<B>,
        state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
+       state_db: sp_state_machine::MemoryDB<HashingFor<B>>,
        client: Arc<Client>,
 }

@@ -170,6 +171,7 @@ where
                                skip_proof,
                        },
                        state: HashMap::default(),
+                       state_db: Default::default(),
                }
        }

@@ -257,11 +259,11 @@ where
        fn import(&mut self, response: StateResponse) -> ImportResult<B> {
                if response.entries.is_empty() && response.proof.is_empty() {
                        debug!(target: LOG_TARGET, "Bad state response");
-                       return ImportResult::BadResponse
+                       return ImportResult::BadResponse;
                }
                if !self.metadata.skip_proof && response.proof.is_empty() {
                        debug!(target: LOG_TARGET, "Missing proof");
-                       return ImportResult::BadResponse
+                       return ImportResult::BadResponse;
                }
                let complete = if !self.metadata.skip_proof {
                        debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
@@ -270,10 +272,10 @@ where
                                Ok(proof) => proof,
                                Err(e) => {
                                        debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
-                                       return ImportResult::BadResponse
+                                       return ImportResult::BadResponse;
                                },
                        };
-                       let (values, completed) = match self.client.verify_range_proof(
+                       let ((values, completed), db) = match self.client.verify_range_proof(
                                self.metadata.target_root(),
                                proof,
                                self.metadata.last_key.as_slice(),
@@ -284,7 +286,7 @@ where
                                                "StateResponse failed proof verification: {}",
                                                e,
                                        );
-                                       return ImportResult::BadResponse
+                                       return ImportResult::BadResponse;
                                },
                                Ok(values) => values,
                        };
@@ -295,6 +297,8 @@ where
                                debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
                        };

+                       self.state_db.consolidate(db);
+
                        self.process_state_verified(values);
                        self.metadata.imported_bytes += proof_size;
                        complete
@@ -307,7 +311,11 @@ where
                        ImportResult::Import(
                                target_hash,
                                self.metadata.target_header.clone(),
-                               ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() },
+                               ImportedState {
+                                       block: target_hash,
+                                       state: std::mem::take(&mut self.state).into(),
+                                       state_db: std::mem::take(&mut self.state_db),
+                               },
                                self.metadata.target_body.clone(),
                                self.metadata.target_justifications.clone(),
                        )
diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs
index 46de27b454..03d2cbeb22 100644
--- a/substrate/client/service/src/client/client.rs
+++ b/substrate/client/service/src/client/client.rs
@@ -698,14 +698,16 @@ where
                                                } else {
                                                        info.genesis_hash
                                                };
-                                               let state_root =
-                                                       self.backend.import_state(block_hash, storage, state_version)?;
+                                               self.backend.import_state_db(changes.state_db);
+                                               /*
+                                               let state_root = self.backend.import_state(block_hash, storage, state_version)?;
                                                if state_root != *import_headers.post().state_root() {
                                                        // State root mismatch when importing state. This should not happen in
                                                        // safe fast sync mode, but may happen in unsafe mode.
                                                        warn!("Error importing state: State root mismatch.");
                                                        return Err(Error::InvalidStateRoot);
                                                }
+                                               */
                                                None
                                        },
                                };
@@ -1419,7 +1421,10 @@ where
                root: Block::Hash,
                proof: CompactProof,
                start_key: &[Vec<u8>],
-       ) -> sp_blockchain::Result<(KeyValueStates, usize)> {
+       ) -> sp_blockchain::Result<(
+               (KeyValueStates, usize),
+               sp_state_machine::MemoryDB<HashingFor<Block>>,
+       )> {
                let mut db = sp_state_machine::MemoryDB::<HashingFor<Block>>::new(&[]);
                // Compact encoding
                let _ = sp_trie::decode_compact::<sp_state_machine::LayoutV0<HashingFor<Block>>, _, _>(
@@ -1428,13 +1433,13 @@ where
                        Some(&root),
                )
                .map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?;
-               let proving_backend = sp_state_machine::TrieBackendBuilder::new(db, root).build();
+               let proving_backend = sp_state_machine::TrieBackendBuilder::new(db.clone(), root).build();
                let state = read_range_proof_check_with_child_on_proving_backend::<HashingFor<Block>>(
                        &proving_backend,
                        start_key,
                )?;

-               Ok(state)
+               Ok((state, db))
        }
 }

:code is not in the MemoryDb.

Copy link
Contributor Author

@liuchengxu liuchengxu Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically to accumulate the trie nodes in the verified proof liuchengxu@81fac33#diff-8b07860de32e2cad312835e8f8953664deba85205215d5b5e8061c752c96588fR302.

This is still a work in progress. I haven't tested it yet with a large chain state, but I believe we should avoid storing the trie nodes in the changeset within BlockImportOperation. Instead, we should commit them directly to the database when calculating storage changes in the Client.

In my local tests, keeping them in BlockImportOperation results in extremely high memory usage for the large chain state, which is unacceptable. I expect that step 1 will help mitigate the risk of running into OOM issues, although it may not completely eliminate them.

Copy link
Contributor Author

@liuchengxu liuchengxu Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I ran into another issue while using the approach of accumulating trie nodes when syncing to a relatively higher block:

return Err(Error::ExtraneousChildNode)

I'm using this branch https://github.com/liuchengxu/polkadot-sdk/tree/state-import-part-1. This approach worked fine when syncing from a lower block. However, when syncing to higher blocks, it seems to break. Additionally, our node doesn’t use child tries, so I’m not sure what’s causing the issue.

I suspect that I may be misusing the trie nodes in the CompactProof, or there could be a deeper issue in Substrate. One possibility I’m considering is that the trie nodes in the state responses might be overlapping or duplicating?

Any insights or suggestions? @bkchr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheme Your input would also be highly valuable, given your extensive work on the trie. My main question is how to merge the verified CompactProof from state responses into the final state db of the state sync target in the form of PrefixedMemoryDB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at your code, you will have a lot of duplicated nodes (each proof starts from the root for example). Which probably leads to the problems you are seeing. Especially as you are using CompactProof which is expecting some special order of the keys. I'm a little bit surprised that this works at all :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/substrate/client/network/sync/src/strategy/state_sync.rs b/substrate/client/network/sync/src/strategy/state_sync.rs
index 813e0ac336..f72378dabe 100644
--- a/substrate/client/network/sync/src/strategy/state_sync.rs
+++ b/substrate/client/network/sync/src/strategy/state_sync.rs
@@ -299,7 +299,15 @@ where
                                debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
                        };

-                       self.trie_nodes.extend(proof.encoded_nodes);
+                       // self.trie_nodes.extend(proof.encoded_nodes);
+                       use std::collections::HashSet;
+                       let seen: HashSet<Vec<u8>> = self.trie_nodes.iter().cloned().collect();
+                       for node in proof.encoded_nodes {
+                               if !seen.contains(&node) {
+                                       self.trie_nodes.push(node);
+                               }
+                       }
+
                        self.metadata.imported_bytes += proof_size;
                        complete
                } else {

That makes sense. I ran a quick experiment to deduplicate nodes, but the ExtraneousChildNode issue persists. Here are the results from a fast sync at the same block:

  • Before deduplication: Total remaining nodes: 134,777
  • After deduplication: Total remaining nodes: 115,475

The approaches involving PrefixedMemoryDB or trie nodes are proving to be quite tricky. While the existing unit tests pass and it seems to work for early blocks, they encounter various issues as the sync progresses. This direction is starting to feel like a bit of a rabbit hole (Note that the StateImporter in this PR appears to work well so far and has been tested on the Astar network). I’d appreciate your advice on the next steps, thank you! @bkchr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support updating the trie changes directly into the database
2 participants