diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 6e396201..5f7db1f3 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,40 +1,173 @@ -mod state_sync_session; - -use std::pin::Pin; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, +}; use grovedb_merk::{ ed::Encode, + merk::restore::Restorer, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, }; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; +#[rustfmt::skip] +use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; -pub use self::state_sync_session::MultiStateSyncSession; -use self::state_sync_session::SubtreesMetadata; -use crate::{Error, GroveDb, TransactionArg}; +use crate::{replication, Error, GroveDb, Transaction, TransactionArg}; + +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -#[cfg(feature = "full")] -impl GroveDb { - pub fn start_new_session(&self) -> Pin> { - MultiStateSyncSession::new(self.start_transaction()) +#[derive(Default)] +struct SubtreeStateSyncInfo<'db> { + // Current Chunk restorer + restorer: Option>>, + // Set of global chunk ids requested to be fetched and pending for processing. For the + // description of global chunk id check fetch_chunk(). + pending_chunks: BTreeSet>, + // Number of processed chunks in current prefix (Path digest) + num_processed_chunks: usize, +} + +// Struct governing state sync +pub struct MultiStateSyncInfo<'db> { + // Map of current processing subtrees + // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + current_prefixes: BTreeMap>, + // Set of processed prefixes (Path digests) + processed_prefixes: BTreeSet, + // Root app_hash + app_hash: [u8; 32], + // Version of state sync protocol, + version: u16, +} + +impl<'db> Default for MultiStateSyncInfo<'db> { + fn default() -> Self { + Self { + current_prefixes: BTreeMap::new(), + processed_prefixes: BTreeSet::new(), + app_hash: [0; 32], + version: CURRENT_STATE_SYNC_VERSION, + } + } +} + +// Struct containing information about current subtrees found in GroveDB +pub struct SubtreesMetadata { + // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent + // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree + // elem_value_hash are needed when verifying the new constructed subtree after wards. + pub data: BTreeMap>, CryptoHash, CryptoHash)>, +} + +impl SubtreesMetadata { + pub fn new() -> SubtreesMetadata { + SubtreesMetadata { + data: BTreeMap::new(), + } + } +} + +impl Default for SubtreesMetadata { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = util_path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str + ); + } + Ok(()) + } +} + +// Converts a path into a human-readable string (for debugging) +pub fn util_path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + subtree_path_str.push( + string + .parse() + .expect("should be able to parse path to string"), + ); } + subtree_path_str +} - pub fn commit_session(&self, session: Pin>) { - // we do not care about the cost - let _ = self.commit_transaction(session.into_transaction()); +// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] +pub fn util_split_global_chunk_id( + global_chunk_id: &[u8], + app_hash: &[u8], +) -> Result<(crate::SubtreePrefix, Vec), Error> { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); } + if global_chunk_id == app_hash { + let array_of_zeros: [u8; 32] = [0; 32]; + let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; + return Ok((root_chunk_prefix_key, vec![])); + } + + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); + let mut array = [0u8; 32]; + array.copy_from_slice(chunk_prefix); + let chunk_prefix_key: crate::SubtreePrefix = array; + Ok((chunk_prefix_key, chunk_id.to_vec())) +} + +pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) +} + +pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); + } + } + } + Ok(res) +} + +#[cfg(feature = "full")] +impl GroveDb { // Returns the discovered subtrees found recursively along with their associated // metadata Params: // tx: Transaction. Function returns the data by opening merks at given tx. // TODO: Add a SubTreePath as param and start searching from that path instead // of root (as it is now) pub fn get_subtrees_metadata(&self, tx: TransactionArg) -> Result { - let mut subtrees_metadata = SubtreesMetadata::new(); + let mut subtrees_metadata = crate::replication::SubtreesMetadata::new(); let subtrees_root = self.find_subtrees(&SubtreePath::empty(), tx).value?; for subtree in subtrees_root.into_iter() { @@ -123,22 +256,13 @@ impl GroveDb { )); } - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix, chunk_id) = + replication::util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; let subtrees_metadata = self.get_subtrees_metadata(tx)?; - match subtrees_metadata.data.get(&chunk_prefix_key) { + match subtrees_metadata.data.get(&chunk_prefix) { Some(path_data) => { let subtree = &path_data.0; let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); @@ -157,7 +281,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -187,7 +311,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -211,87 +335,269 @@ impl GroveDb { } } - /// Starts a state sync process of a snapshot with `app_hash` root hash, - /// should be called by ABCI when OfferSnapshot method is called. - /// Returns the first set of global chunk ids that can be fetched from - /// sources and a new sync session. + // Starts a state sync process (should be called by ABCI when OfferSnapshot + // method is called) Params: + // state_sync_info: Consumed StateSyncInfo + // app_hash: Snapshot's AppHash + // tx: Transaction for the state sync + // Returns the StateSyncInfo transferring ownership back to the caller) pub fn start_snapshot_syncing<'db>( &'db self, + mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, + tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, Pin>>), Error> { + ) -> Result { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( "Unsupported state sync protocol version".to_string(), )); } + if version != state_sync_info.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } - println!(" starting:{:?}...", util_path_to_string(&[])); + if !state_sync_info.current_prefixes.is_empty() + || !state_sync_info.processed_prefixes.is_empty() + { + return Err(Error::InternalError( + "GroveDB has already started a snapshot syncing", + )); + } - let root_prefix = [0u8; 32]; + println!( + " starting:{:?}...", + replication::util_path_to_string(&[]) + ); - let mut session = MultiStateSyncSession::new(self.start_transaction()); - session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; + let mut root_prefix_state_sync_info = SubtreeStateSyncInfo::default(); + let root_prefix = [0u8; 32]; + if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { + let restorer = Restorer::new(merk, app_hash, None); + root_prefix_state_sync_info.restorer = Some(restorer); + root_prefix_state_sync_info.pending_chunks.insert(vec![]); + state_sync_info + .current_prefixes + .insert(root_prefix, root_prefix_state_sync_info); + state_sync_info.app_hash = app_hash; + } else { + return Err(Error::InternalError("Unable to open merk for replication")); + } - Ok((vec![root_prefix.to_vec()], session)) + Ok(state_sync_info) } -} -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); - subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), - ); - } - subtree_path_str -} + // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is + // called) Params: + // state_sync_info: Consumed MultiStateSyncInfo + // global_chunk_id: Global chunk id + // chunk: Chunk proof operators encoded in bytes + // tx: Transaction for the state sync + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the MultiStateSyncInfo transferring ownership back to the caller) + pub fn apply_chunk<'db>( + &'db self, + mut state_sync_info: MultiStateSyncInfo<'db>, + global_chunk_id: &[u8], + chunk: Vec, + tx: &'db Transaction, + version: u16, + ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + if version != state_sync_info.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } -// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], -) -> Result<(crate::SubtreePrefix, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); + let mut next_chunk_ids = vec![]; + + let (chunk_prefix, chunk_id) = + replication::util_split_global_chunk_id(global_chunk_id, &state_sync_info.app_hash)?; + + if state_sync_info.current_prefixes.is_empty() { + return Err(Error::InternalError("GroveDB is not in syncing mode")); + } + if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { + if let Ok((res, mut new_subtree_state_sync)) = + self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk) + { + if !res.is_empty() { + for local_chunk_id in res.iter() { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(local_chunk_id.to_vec()); + next_chunk_ids.push(next_global_chunk_id); + } + + // re-insert subtree_state_sync in state_sync_info + state_sync_info + .current_prefixes + .insert(chunk_prefix, new_subtree_state_sync); + Ok((next_chunk_ids, state_sync_info)) + } else { + if !new_subtree_state_sync.pending_chunks.is_empty() { + // re-insert subtree_state_sync in state_sync_info + state_sync_info + .current_prefixes + .insert(chunk_prefix, new_subtree_state_sync); + return Ok((vec![], state_sync_info)); + } + + // Subtree is finished. We can save it. + match new_subtree_state_sync.restorer.take() { + None => Err(Error::InternalError("Unable to finalize subtree")), + Some(restorer) => { + if (new_subtree_state_sync.num_processed_chunks > 0) + && (restorer.finalize().is_err()) + { + return Err(Error::InternalError("Unable to finalize Merk")); + } + state_sync_info.processed_prefixes.insert(chunk_prefix); + + // Subtree was successfully save. Time to discover new subtrees that + // need to be processed + let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; + if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { + println!( + " path:{:?} done (num_processed_chunks:{:?})", + replication::util_path_to_string(&value.0), + new_subtree_state_sync.num_processed_chunks + ); + } + + if let Ok((res, new_state_sync_info)) = + self.discover_subtrees(state_sync_info, subtrees_metadata, tx) + { + next_chunk_ids.extend(res); + Ok((next_chunk_ids, new_state_sync_info)) + } else { + Err(Error::InternalError("Unable to discover Subtrees")) + } + } + } + } + } else { + Err(Error::InternalError("Unable to process incoming chunk")) + } + } else { + Err(Error::InternalError("Invalid incoming prefix")) + } } - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; - Ok((chunk_prefix_key, chunk_id.to_vec())) -} + // Apply a chunk using the given SubtreeStateSyncInfo + // state_sync_info: Consumed SubtreeStateSyncInfo + // chunk_id: Local chunk id + // chunk_data: Chunk proof operators encoded in bytes + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the SubtreeStateSyncInfo transferring ownership back to the caller) + fn apply_inner_chunk<'db>( + &'db self, + mut state_sync_info: SubtreeStateSyncInfo<'db>, + chunk_id: &[u8], + chunk_data: Vec, + ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { + let mut res = vec![]; -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + match &mut state_sync_info.restorer { + Some(restorer) => { + if !state_sync_info.pending_chunks.contains(chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected", + )); + } + state_sync_info.pending_chunks.remove(chunk_id); + if !chunk_data.is_empty() { + match util_decode_vec_ops(chunk_data) { + Ok(ops) => { + match restorer.process_chunk(chunk_id, ops) { + Ok(next_chunk_ids) => { + state_sync_info.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + state_sync_info + .pending_chunks + .insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError( + "Unable to process incoming chunk", + )); + } + }; + } + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); + } + } + } + } + _ => { + return Err(Error::InternalError("Invalid internal state (restorer")); + } + } + + Ok((res, state_sync_info)) } - Ok(res) -} -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), - Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); + // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in + // subtrees_metadata and returns the root global chunk ids for all of those + // new subtrees. state_sync_info: Consumed MultiStateSyncInfo + // subtrees_metadata: Metadata about discovered subtrees + // chunk_data: Chunk proof operators + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the MultiStateSyncInfo transferring ownership back to the caller) + fn discover_subtrees<'db>( + &'db self, + mut state_sync_info: MultiStateSyncInfo<'db>, + subtrees_metadata: SubtreesMetadata, + tx: &'db Transaction, + ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + let mut res = vec![]; + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !state_sync_info.processed_prefixes.contains(prefix) + && !state_sync_info.current_prefixes.contains_key(prefix) + { + let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + println!( + " path:{:?} starting...", + replication::util_path_to_string(&prefix_metadata.0) + ); + + let mut subtree_state_sync_info = SubtreeStateSyncInfo::default(); + if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { + let restorer = + Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); + subtree_state_sync_info.restorer = Some(restorer); + subtree_state_sync_info.pending_chunks.insert(vec![]); + + state_sync_info + .current_prefixes + .insert(*prefix, subtree_state_sync_info); + + let root_chunk_prefix = prefix.to_vec(); + res.push(root_chunk_prefix.to_vec()); + } else { + return Err(Error::InternalError("Unable to open Merk for replication")); + } } } + + Ok((res, state_sync_info)) } - Ok(res) } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs deleted file mode 100644 index 1a8ed7ef..00000000 --- a/grovedb/src/replication/state_sync_session.rs +++ /dev/null @@ -1,368 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, - marker::PhantomPinned, - pin::Pin, -}; - -use grovedb_merk::{CryptoHash, Restorer}; -use grovedb_path::SubtreePath; -use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; - -use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION}; -use crate::{replication::util_path_to_string, Error, GroveDb, Transaction}; - -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; - -struct SubtreeStateSyncInfo<'db> { - /// Current Chunk restorer - restorer: Restorer>, - /// Set of global chunk ids requested to be fetched and pending for - /// processing. For the description of global chunk id check - /// fetch_chunk(). - pending_chunks: BTreeSet>, - /// Number of processed chunks in current prefix (Path digest) - num_processed_chunks: usize, -} - -impl<'db> SubtreeStateSyncInfo<'db> { - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) - fn apply_inner_chunk( - &mut self, - chunk_id: &[u8], - chunk_data: Vec, - ) -> Result>, Error> { - let mut res = vec![]; - - if !self.pending_chunks.contains(chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected", - )); - } - self.pending_chunks.remove(chunk_id); - if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { - Ok(ops) => { - match self.restorer.process_chunk(chunk_id, ops) { - Ok(next_chunk_ids) => { - self.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - self.pending_chunks.insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } - } - _ => { - return Err(Error::InternalError("Unable to process incoming chunk")); - } - }; - } - Err(_) => { - return Err(Error::CorruptedData( - "Unable to decode incoming chunk".to_string(), - )); - } - } - } - - Ok(res) - } -} - -impl<'tx> SubtreeStateSyncInfo<'tx> { - pub fn new(restorer: Restorer>) -> Self { - SubtreeStateSyncInfo { - restorer, - pending_chunks: Default::default(), - num_processed_chunks: 0, - } - } -} - -// Struct governing state sync -pub struct MultiStateSyncSession<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) - processed_prefixes: BTreeSet, - // Version of state sync protocol, - pub(crate) version: u16, - // Transaction goes last to be dropped last as well - transaction: Transaction<'db>, - _pin: PhantomPinned, -} - -impl<'db> MultiStateSyncSession<'db> { - /// Initializes a new state sync session. - pub fn new(transaction: Transaction<'db>) -> Pin> { - Box::pin(MultiStateSyncSession { - transaction, - current_prefixes: Default::default(), - processed_prefixes: Default::default(), - version: CURRENT_STATE_SYNC_VERSION, - _pin: PhantomPinned, - }) - } - - pub fn is_empty(&self) -> bool { - self.current_prefixes.is_empty() - } - - pub fn into_transaction(self: Pin>) -> Transaction<'db> { - // SAFETY: the struct isn't used anymore and no one will refer to transaction - // address again - unsafe { Pin::into_inner_unchecked(self) }.transaction - } - - pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( - self: &mut Pin>>, - db: &'db GroveDb, - path: SubtreePath<'b, B>, - hash: CryptoHash, - actual_hash: Option, - chunk_prefix: [u8; 32], - ) -> Result<(), Error> { - // SAFETY: we get an immutable reference of a transaction that stays behind - // `Pin` so this reference shall remain valid for the whole session - // object lifetime. - let transaction_ref: &'db Transaction<'db> = unsafe { - let tx: &mut Transaction<'db> = - &mut Pin::into_inner_unchecked(self.as_mut()).transaction; - &*(tx as *mut _) - }; - - if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) { - let restorer = Restorer::new(merk, hash, actual_hash); - let mut sync_info = SubtreeStateSyncInfo::new(restorer); - sync_info.pending_chunks.insert(vec![]); - self.as_mut() - .current_prefixes() - .insert(chunk_prefix, sync_info); - Ok(()) - } else { - Err(Error::InternalError("Unable to open merk for replication")) - } - } - - fn current_prefixes( - self: Pin<&mut MultiStateSyncSession<'db>>, - ) -> &mut BTreeMap> { - // SAFETY: no memory-sensitive assumptions are made about fields except the - // `transaciton` so it will be safe to modify them - &mut unsafe { self.get_unchecked_mut() }.current_prefixes - } - - fn processed_prefixes( - self: Pin<&mut MultiStateSyncSession<'db>>, - ) -> &mut BTreeSet { - // SAFETY: no memory-sensitive assumptions are made about fields except the - // `transaciton` so it will be safe to modify them - &mut unsafe { self.get_unchecked_mut() }.processed_prefixes - } - - /// Applies a chunk, shuold be called by ABCI when `ApplySnapshotChunk` - /// method is called. `chunk` is a pair of global chunk id and an - /// encoded proof. - pub fn apply_chunk( - self: &mut Pin>>, - db: &'db GroveDb, - chunk: (&[u8], Vec), - version: u16, - ) -> Result>, Error> { - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - if version != self.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - let mut next_chunk_ids = vec![]; - - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?; - - if self.is_empty() { - return Err(Error::InternalError("GroveDB is not in syncing mode")); - } - - let current_prefixes = self.as_mut().current_prefixes(); - let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { - return Err(Error::InternalError("Unable to process incoming chunk")); - }; - let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else { - return Err(Error::InternalError("Invalid incoming prefix")); - }; - - if !res.is_empty() { - for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); - } - - Ok(next_chunk_ids) - } else { - if !subtree_state_sync.pending_chunks.is_empty() { - return Ok(vec![]); - } - - // Subtree is finished. We can save it. - if (subtree_state_sync.num_processed_chunks > 0) - && (current_prefixes - .remove(&chunk_prefix) - .expect("prefix exists") - .restorer - .finalize() - .is_err()) - { - return Err(Error::InternalError("Unable to finalize Merk")); - } - self.as_mut().processed_prefixes().insert(chunk_prefix); - - // // Subtree was successfully save. Time to discover new subtrees that - // // need to be processed - // if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - // println!( - // " path:{:?} done (num_processed_chunks:{:?})", - // util_path_to_string(&value.0), - // subtree_state_sync.num_processed_chunks - // ); - // } - - if let Ok(res) = self.discover_subtrees(db) { - next_chunk_ids.extend(res); - Ok(next_chunk_ids) - } else { - Err(Error::InternalError("Unable to discover Subtrees")) - } - } - } - - /// Prepares sync session for the freshly discovered subtrees and returns - /// global chunk ids of those new subtrees. - fn discover_subtrees( - self: &mut Pin>>, - db: &'db GroveDb, - ) -> Result>, Error> { - let subtrees_metadata = db.get_subtrees_metadata(Some(&self.transaction))?; - - let mut res = vec![]; - - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !self.processed_prefixes.contains(prefix) - && !self.current_prefixes.contains_key(prefix) - { - let (current_path, actual_value_hash, elem_value_hash) = &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - println!( - " path:{:?} starting...", - util_path_to_string(&prefix_metadata.0) - ); - - self.add_subtree_sync_info( - db, - path.into(), - elem_value_hash.clone(), - Some(actual_value_hash.clone()), - prefix.clone(), - )?; - res.push(prefix.to_vec()); - } - } - - Ok(res) - } -} - -// impl<'db> Default for MultiStateSyncInfo<'db> { -// fn default() -> Self { -// Self { -// current_prefixes: BTreeMap::new(), -// processed_prefixes: BTreeSet::new(), -// version: CURRENT_STATE_SYNC_VERSION, -// } -// } -// } - -// fn lol(db: &GroveDb) -> MultiStateSyncSession { -// let mut sync = MultiStateSyncSession { -// transaction: db.start_transaction(), -// current_prefixes: Default::default(), -// processed_prefixes: Default::default(), -// version: 0, -// }; - -// sync.current_prefixes.insert( -// b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_owned(), -// SubtreeStateSyncInfo { -// restorer: Some(Restorer::new( -// db.open_merk_for_replication(SubtreePath::empty(), -// &sync.transaction) .unwrap(), -// b"11111111111111111111111111111111".to_owned(), -// None, -// )), -// pending_chunks: Default::default(), -// num_processed_chunks: 0, -// }, -// ); - -// let ass: Option<&mut SubtreeStateSyncInfo> = -// sync.current_prefixes.values_mut().next(); - -// let ass2: &mut SubtreeStateSyncInfo = ass.unwrap(); - -// ass2.apply_inner_chunk(b"a", vec![]).unwrap(); - -// sync -// } - -// Struct containing information about current subtrees found in GroveDB -pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. - pub data: BTreeMap>, CryptoHash, CryptoHash)>, -} - -impl SubtreesMetadata { - pub fn new() -> SubtreesMetadata { - SubtreesMetadata { - data: BTreeMap::new(), - } - } -} - -impl Default for SubtreesMetadata { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - )?; - } - Ok(()) - } -} diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index d5d030d5..bfdc1782 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -5,7 +5,7 @@ use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb::replication::MultiStateSyncSession; +use grovedb::replication::MultiStateSyncInfo; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; @@ -101,7 +101,10 @@ fn main() { println!("{:?}", subtrees_metadata_source); println!("\n######### db_checkpoint_0 -> db_destination state sync"); - sync_db_demo(&db_checkpoint_0, &db_destination, /*&mut state_sync_session*/).unwrap(); + let state_info = MultiStateSyncInfo::default(); + let tx = db_destination.start_transaction(); + sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx).unwrap(); + db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None).unwrap(); @@ -238,22 +241,24 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, + state_sync_info: MultiStateSyncInfo, + target_tx: &Transaction, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; + let mut state_sync_info = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); - chunk_queue.extend(chunk_ids); + // The very first chunk to fetch is always identified by the root app_hash + chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?; + let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, chunk_id.as_slice(), ops, target_tx, CURRENT_STATE_SYNC_VERSION)?; + state_sync_info = new_state_sync_info; chunk_queue.extend(more_chunks); } - let _ = target_db.commit_transaction(session.into_transaction()); - Ok(()) }