Skip to content

Commit

Permalink
Add insert_or_replace methods (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsserge authored Oct 22, 2024
1 parent 6410c3e commit b84885e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "vart"
publish = true
version = "0.6.0"
version = "0.6.1"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"
Expand Down
97 changes: 88 additions & 9 deletions src/art.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
commit_version: u64,
ts: u64,
depth: usize,
replace: bool,
) -> NodeArc<P, V> {
// Obtain the current node's prefix and its length.
let cur_node_prefix = cur_node.prefix().clone();
Expand All @@ -714,7 +715,14 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
// update the existing value in the Twig node.
if let NodeType::Twig(ref twig) = &cur_node.node_type {
if is_prefix_match && cur_node_prefix.len() == key_prefix.len() {
let new_twig = twig.insert(value, commit_version, ts);
let new_twig = if replace {
// Create a replacement Twig node with the new value only.
let mut new_twig = TwigNode::new(twig.prefix.clone(), twig.key.clone());
new_twig.insert_mut(value, commit_version, ts);
new_twig
} else {
twig.insert(value, commit_version, ts)
};
return Arc::new(Node {
node_type: NodeType::Twig(new_twig),
});
Expand Down Expand Up @@ -752,6 +760,7 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
commit_version,
ts,
depth + longest_common_prefix,
replace,
);
let new_node = cur_node.replace_child(k, new_child);
return Arc::new(new_node);
Expand All @@ -777,6 +786,7 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
commit_version: u64,
ts: u64,
depth: usize,
replace: bool,
) {
// Obtain the current node's prefix and its length.
let cur_node_prefix = cur_node.prefix().clone();
Expand All @@ -788,7 +798,17 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
// update the existing value in the Twig node.
if let NodeType::Twig(ref mut twig) = &mut cur_node.node_type {
if is_prefix_match && cur_node_prefix.len() == key_prefix.len() {
twig.insert_mut(value.clone(), commit_version, ts);
if replace {
// Only replace if the provided value is more recent than
// the existing ones. This is important because this method
// is used for loading the index in SurrealKV and thus must
// be able to handle segments left by an unfinished compaction
// where older versions can end up in more recent segments
// after the newer versions.
twig.replace_if_newer_mut(value, commit_version, ts);
} else {
twig.insert_mut(value, commit_version, ts);
}
return;
}
}
Expand All @@ -807,7 +827,7 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
let new_twig = Node::new_twig(
key_prefix[longest_common_prefix..].into(),
key.as_slice().into(),
value.clone(),
value,
commit_version,
ts,
);
Expand All @@ -824,10 +844,11 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
Node::insert_recurse_mut(
child,
key,
value.clone(),
value,
commit_version,
ts,
depth + longest_common_prefix,
replace,
);
cur_node.update_version();
return;
Expand All @@ -837,7 +858,7 @@ impl<P: KeyTrait, V: Clone> Node<P, V> {
let new_twig = Node::new_twig(
key_prefix[longest_common_prefix..].into(),
key.as_slice().into(),
value.clone(),
value,
commit_version,
ts,
);
Expand Down Expand Up @@ -1027,6 +1048,7 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
version: u64,
ts: u64,
check_version: bool,
replace: bool,
) -> Result<(), TrieError> {
let new_root = match &self.root {
None => {
Expand All @@ -1047,7 +1069,7 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
} else if check_version && curr_version > version {
return Err(TrieError::VersionIsOld);
}
Node::insert_recurse(root, key, value, commit_version, ts, 0)
Node::insert_recurse(root, key, value, commit_version, ts, 0, replace)
}
};

Expand All @@ -1063,6 +1085,7 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
version: u64,
ts: u64,
check_version: bool,
replace: bool,
) -> Result<(), TrieError> {
if let Some(root_arc) = self.root.as_mut() {
let curr_version = root_arc.version();
Expand All @@ -1073,7 +1096,7 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
return Err(TrieError::VersionIsOld);
}
if let Some(root) = Arc::get_mut(root_arc) {
Node::insert_recurse_mut(root, key, value, commit_version, ts, 0)
Node::insert_recurse_mut(root, key, value, commit_version, ts, 0, replace)
} else {
return Err(TrieError::RootIsNotUniquelyOwned);
}
Expand Down Expand Up @@ -1116,7 +1139,17 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
/// Returns an error if the given version is older than the root's current version.
///
pub fn insert(&mut self, key: &P, value: V, version: u64, ts: u64) -> Result<(), TrieError> {
self.insert_common(key, value, version, ts, true)
self.insert_common(key, value, version, ts, true, false)
}

pub fn insert_or_replace(
&mut self,
key: &P,
value: V,
version: u64,
ts: u64,
) -> Result<(), TrieError> {
self.insert_common(key, value, version, ts, true, true)
}

pub fn insert_unchecked(
Expand All @@ -1126,7 +1159,17 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
version: u64,
ts: u64,
) -> Result<(), TrieError> {
self.insert_common_mut(key, value, version, ts, false)
self.insert_common_mut(key, value, version, ts, false, false)
}

pub fn insert_or_replace_unchecked(
&mut self,
key: &P,
value: V,
version: u64,
ts: u64,
) -> Result<(), TrieError> {
self.insert_common_mut(key, value, version, ts, false, true)
}

pub(crate) fn remove_from_node(
Expand Down Expand Up @@ -2666,4 +2709,40 @@ mod tests {
assert_eq!(result, (20, 2, 150));
assert_eq!(tree.version(), 3);
}

#[test]
fn test_insert_or_replace() {
let mut tree: Tree<VariableSizeKey, i32> = Tree::new();
let key = VariableSizeKey::from_str("key").unwrap();

tree.insert_or_replace(&key, 1, 10, 100).unwrap();
tree.insert_or_replace(&key, 2, 20, 200).unwrap();

let history = tree.get_version_history(&key).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0], (2, 20, 200));
}

#[test]
fn test_insert_or_replace_unchecked() {
let mut tree: Tree<VariableSizeKey, i32> = Tree::new();
let key = VariableSizeKey::from_str("key").unwrap();

// Scenario 1: the second value is more recent than the first one.
tree.insert_or_replace_unchecked(&key, 1, 10, 100).unwrap();
tree.insert_or_replace_unchecked(&key, 2, 20, 200).unwrap();

let history = tree.get_version_history(&key).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0], (2, 20, 200));

// Scenario 2: the new value has the smaller version and hence
// is older than the one already in the tree. Discard the new
// value.
tree.insert_or_replace_unchecked(&key, 1, 1, 1).unwrap();

let history = tree.get_version_history(&key).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0], (2, 20, 200));
}
}
7 changes: 7 additions & 0 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ impl<K: KeyTrait, V: Clone> TwigNode<K, V> {
self.version = self.version(); // Update LeafNode's version
}

pub(crate) fn replace_if_newer_mut(&mut self, value: V, version: u64, ts: u64) {
if version > self.version {
self.values.clear();
self.insert_mut(value, version, ts);
}
}

pub(crate) fn iter(&self) -> impl DoubleEndedIterator<Item = &Arc<LeafValue<V>>> {
self.values.iter()
}
Expand Down
2 changes: 1 addition & 1 deletion src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<P: KeyTrait, V: Clone> Snapshot<P, V> {
// Insert the key-value pair into the root node using a recursive function
match &self.root {
Some(root) => {
let new_node = Node::insert_recurse(root, key, value, self.ts, ts, 0);
let new_node = Node::insert_recurse(root, key, value, self.ts, ts, 0, false);
// Update the root node with the new node after insertion
self.root = Some(new_node);
}
Expand Down

0 comments on commit b84885e

Please sign in to comment.