diff --git a/src/art.rs b/src/art.rs index 1273944..1705473 100644 --- a/src/art.rs +++ b/src/art.rs @@ -703,6 +703,7 @@ impl Node { commit_version: u64, ts: u64, depth: usize, + replace: bool, ) -> NodeArc { // Obtain the current node's prefix and its length. let cur_node_prefix = cur_node.prefix().clone(); @@ -714,7 +715,14 @@ impl Node { // update the existing value in the Twig node. if let NodeType::Twig(ref twig) = &cur_node.node_type { if is_prefix_match && cur_node_prefix.len() == key_prefix.len() { - let new_twig = twig.insert(value, commit_version, ts); + let new_twig = if replace { + // Create a replacement Twig node with the new value only. + let mut new_twig = TwigNode::new(twig.prefix.clone(), twig.key.clone()); + new_twig.insert_mut(value, commit_version, ts); + new_twig + } else { + twig.insert(value, commit_version, ts) + }; return Arc::new(Node { node_type: NodeType::Twig(new_twig), }); @@ -752,6 +760,7 @@ impl Node { commit_version, ts, depth + longest_common_prefix, + replace, ); let new_node = cur_node.replace_child(k, new_child); return Arc::new(new_node); @@ -777,6 +786,7 @@ impl Node { commit_version: u64, ts: u64, depth: usize, + replace: bool, ) { // Obtain the current node's prefix and its length. let cur_node_prefix = cur_node.prefix().clone(); @@ -788,7 +798,21 @@ impl Node { // update the existing value in the Twig node. if let NodeType::Twig(ref mut twig) = &mut cur_node.node_type { if is_prefix_match && cur_node_prefix.len() == key_prefix.len() { - twig.insert_mut(value.clone(), commit_version, ts); + if replace { + if commit_version > twig.version { + // Only replace if the provided value is more recent than + // the existing ones, i.e. its commit_version is greater than + // the max version of this Twig node. This is important + // because this method is used for loading the index in + // SurrealKV and thus must be able to handle segments left + // by an unfinished compaction where older versions can + // end up in more recent segments after the newer versions. + twig.values.clear(); + twig.insert_mut(value.clone(), commit_version, ts); + } + } else { + twig.insert_mut(value.clone(), commit_version, ts); + } return; } } @@ -828,6 +852,7 @@ impl Node { commit_version, ts, depth + longest_common_prefix, + replace, ); cur_node.update_version(); return; @@ -1027,6 +1052,7 @@ impl Tree { version: u64, ts: u64, check_version: bool, + replace: bool, ) -> Result<(), TrieError> { let new_root = match &self.root { None => { @@ -1047,7 +1073,7 @@ impl Tree { } else if check_version && curr_version > version { return Err(TrieError::VersionIsOld); } - Node::insert_recurse(root, key, value, commit_version, ts, 0) + Node::insert_recurse(root, key, value, commit_version, ts, 0, replace) } }; @@ -1063,6 +1089,7 @@ impl Tree { version: u64, ts: u64, check_version: bool, + replace: bool, ) -> Result<(), TrieError> { if let Some(root_arc) = self.root.as_mut() { let curr_version = root_arc.version(); @@ -1073,7 +1100,7 @@ impl Tree { return Err(TrieError::VersionIsOld); } if let Some(root) = Arc::get_mut(root_arc) { - Node::insert_recurse_mut(root, key, value, commit_version, ts, 0) + Node::insert_recurse_mut(root, key, value, commit_version, ts, 0, replace) } else { return Err(TrieError::RootIsNotUniquelyOwned); } @@ -1116,7 +1143,17 @@ impl Tree { /// Returns an error if the given version is older than the root's current version. /// pub fn insert(&mut self, key: &P, value: V, version: u64, ts: u64) -> Result<(), TrieError> { - self.insert_common(key, value, version, ts, true) + self.insert_common(key, value, version, ts, true, false) + } + + pub fn insert_or_replace( + &mut self, + key: &P, + value: V, + version: u64, + ts: u64, + ) -> Result<(), TrieError> { + self.insert_common(key, value, version, ts, true, true) } pub fn insert_unchecked( @@ -1126,7 +1163,17 @@ impl Tree { version: u64, ts: u64, ) -> Result<(), TrieError> { - self.insert_common_mut(key, value, version, ts, false) + self.insert_common_mut(key, value, version, ts, false, false) + } + + pub fn insert_or_replace_unchecked( + &mut self, + key: &P, + value: V, + version: u64, + ts: u64, + ) -> Result<(), TrieError> { + self.insert_common_mut(key, value, version, ts, false, true) } pub(crate) fn remove_from_node( @@ -2666,4 +2713,40 @@ mod tests { assert_eq!(result, (20, 2, 150)); assert_eq!(tree.version(), 3); } + + #[test] + fn test_insert_or_replace() { + let mut tree: Tree = Tree::new(); + let key = VariableSizeKey::from_str("key").unwrap(); + + tree.insert_or_replace(&key, 1, 10, 100).unwrap(); + tree.insert_or_replace(&key, 2, 20, 200).unwrap(); + + let history = tree.get_version_history(&key).unwrap(); + assert_eq!(history.len(), 1); + assert_eq!(history[0], (2, 20, 200)); + } + + #[test] + fn test_insert_or_replace_unchecked() { + let mut tree: Tree = Tree::new(); + let key = VariableSizeKey::from_str("key").unwrap(); + + // Scenario 1: the second value is more recent than the first one. + tree.insert_or_replace_unchecked(&key, 1, 10, 100).unwrap(); + tree.insert_or_replace_unchecked(&key, 2, 20, 200).unwrap(); + + let history = tree.get_version_history(&key).unwrap(); + assert_eq!(history.len(), 1); + assert_eq!(history[0], (2, 20, 200)); + + // Scenario 2: the new value has the smaller version and hence + // is older than the one already in the tree. Discard the new + // value. + tree.insert_or_replace_unchecked(&key, 1, 1, 1).unwrap(); + + let history = tree.get_version_history(&key).unwrap(); + assert_eq!(history.len(), 1); + assert_eq!(history[0], (2, 20, 200)); + } } diff --git a/src/snapshot.rs b/src/snapshot.rs index ba1061e..382de20 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -25,7 +25,7 @@ impl Snapshot { // Insert the key-value pair into the root node using a recursive function match &self.root { Some(root) => { - let new_node = Node::insert_recurse(root, key, value, self.ts, ts, 0); + let new_node = Node::insert_recurse(root, key, value, self.ts, ts, 0, false); // Update the root node with the new node after insertion self.root = Some(new_node); }