From dc0785a907b2c2fe80e3e3130fe660c1f5ab4c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 18 Apr 2024 15:11:04 +0200 Subject: [PATCH] feat: implement `PublicDirectory` conflict reconciliation (#426) This implements three functions: - `PublicDirectory::reconcile` which takes two directories & reconciles all changes between them. Think of this as similar to `get pull`, but where conflicts in files are merged automatically via a simple tie-breaking mechanism on the file hash. The return value indicates whether and which files were affected by the automatic merge, if at all. - `PublicDirectory::merge`, the underlying function for `reconcile`, but which doesn't take into account if one of the directories may have been "ahead" in history. Use only if you know what you're doing, otherwise opt for `reconcile`. - `PublicNode::causal_compare`, the underlying function in `reconcile` that figures out whether one version of a node is "ahead" of another or behind, or if they're two conflicting versions and need to be merged. --- * feat: Conflict reconciliation for PublicDirectory (first draft) * fix: Use `async_recursion` * refactor: Use more `prop_assert` * chore: Write proptests for `causal_compare` * chore: Write tests for `PublicDirectory::merge` * chore: Write documentation * fix: Consistently merge metadata between nodes * fix: Test types * chore: Remove unused imports * fix: Bugs in merge implementation Specifically: - trivially merge exactly equal nodes without creating a history entry - correctly reset `persisted_as` when creating a merge node - always advance the history entry when creating a merge node * fix: Don't merge equal files --- wnfs-common/src/metadata.rs | 21 +- wnfs-hamt/src/diff.rs | 34 +- wnfs-hamt/src/merge.rs | 16 +- wnfs-hamt/src/node.rs | 25 +- wnfs-hamt/src/pointer.rs | 2 +- wnfs-nameaccumulator/src/name.rs | 2 +- wnfs-unixfs-file/src/balanced_tree.rs | 1 - wnfs-unixfs-file/src/builder.rs | 1 - wnfs-unixfs-file/src/protobufs.rs | 1 - .../proptest-regressions/public/directory.txt | 11 + .../proptest-regressions/public/node/node.txt | 8 + wnfs/src/private/file.rs | 21 +- wnfs/src/public/directory.rs | 380 +++++++++++++++++- wnfs/src/public/file.rs | 26 ++ wnfs/src/public/node/node.rs | 335 ++++++++++++++- 15 files changed, 836 insertions(+), 48 deletions(-) create mode 100644 wnfs/proptest-regressions/public/directory.txt create mode 100644 wnfs/proptest-regressions/public/node/node.txt diff --git a/wnfs-common/src/metadata.rs b/wnfs-common/src/metadata.rs index 02632cdf..bd12e05b 100644 --- a/wnfs-common/src/metadata.rs +++ b/wnfs-common/src/metadata.rs @@ -2,7 +2,8 @@ use anyhow::{bail, Result}; use chrono::{DateTime, TimeZone, Utc}; -use libipld::Ipld; +use libipld::{Ipld, Multihash}; +use multihash::{Code, MultihashDigest}; use serde::{ de::{DeserializeOwned, Error as DeError}, Deserialize, Deserializer, Serialize, Serializer, @@ -223,6 +224,24 @@ impl Metadata { self.0.insert(key.clone(), value.clone()); } } + + pub(crate) fn hash(&self) -> Result { + let vec = serde_ipld_dagcbor::to_vec(self)?; + let hash = Code::Blake3_256.digest(&vec); + Ok(hash) + } + + /// Tie break this node with another one. + /// Used for conflict reconciliation. We don't merge the two metadata maps + /// together (yet), instead we compare their hashes. The one with the lower hash + /// survives. + pub fn tie_break_with(&mut self, other: &Self) -> Result<()> { + if self.hash()?.digest() > other.hash()?.digest() { + self.0 = other.0.clone(); + } + + Ok(()) + } } impl TryFrom<&Ipld> for NodeType { diff --git a/wnfs-hamt/src/diff.rs b/wnfs-hamt/src/diff.rs index 2343b43d..eb0f4f83 100644 --- a/wnfs-hamt/src/diff.rs +++ b/wnfs-hamt/src/diff.rs @@ -605,6 +605,7 @@ mod proptests { ChangeType, }; use async_std::task; + use proptest::{prop_assert, prop_assert_eq}; use std::collections::HashSet; use test_strategy::proptest; use wnfs_common::{utils::Arc, Link, MemoryBlockStore}; @@ -638,19 +639,17 @@ mod proptests { .await .unwrap(); - assert_eq!(strategy_changes.len(), changes.len()); + prop_assert_eq!(strategy_changes.len(), changes.len()); for strategy_change in strategy_changes { - assert!(changes.iter().any(|c| match &strategy_change { + let found_matching_change = changes.iter().any(|c| match &strategy_change { Change::Add(k, _) => c.r#type == ChangeType::Add && &c.key == k, - Change::Modify(k, _) => { - c.r#type == ChangeType::Modify && &c.key == k - } - Change::Remove(k) => { - c.r#type == ChangeType::Remove && &c.key == k - } - })); + Change::Modify(k, _) => c.r#type == ChangeType::Modify && &c.key == k, + Change::Remove(k) => c.r#type == ChangeType::Remove && &c.key == k, + }); + prop_assert!(found_matching_change); } - }); + Ok(()) + })?; } #[proptest(cases = 1000, max_shrink_iters = 40000)] @@ -673,8 +672,9 @@ mod proptests { .map(|c| c.key.clone()) .collect::>(); - assert_eq!(change_set.len(), changes.len()); - }); + prop_assert_eq!(change_set.len(), changes.len()); + Ok(()) + })?; } #[proptest(cases = 100)] @@ -700,14 +700,16 @@ mod proptests { .await .unwrap(); - assert_eq!(changes.len(), flipped_changes.len()); + prop_assert_eq!(changes.len(), flipped_changes.len()); for change in changes { - assert!(flipped_changes.iter().any(|c| match change.r#type { + let found_matching_change = flipped_changes.iter().any(|c| match change.r#type { ChangeType::Add => c.r#type == ChangeType::Remove && c.key == change.key, ChangeType::Remove => c.r#type == ChangeType::Add && c.key == change.key, ChangeType::Modify => c.r#type == ChangeType::Modify && c.key == change.key, - })); + }); + prop_assert!(found_matching_change); } - }); + Ok(()) + })?; } } diff --git a/wnfs-hamt/src/merge.rs b/wnfs-hamt/src/merge.rs index b0f9d8b0..f0b49c52 100644 --- a/wnfs-hamt/src/merge.rs +++ b/wnfs-hamt/src/merge.rs @@ -69,6 +69,7 @@ where mod proptests { use crate::strategies::{self, generate_kvs}; use async_std::task; + use proptest::prop_assert_eq; use std::cmp; use test_strategy::proptest; use wnfs_common::{utils::Arc, Link, MemoryBlockStore}; @@ -126,8 +127,9 @@ mod proptests { .unwrap() }; - assert_eq!(merge_node_left_assoc, merge_node_right_assoc); - }); + prop_assert_eq!(merge_node_left_assoc, merge_node_right_assoc); + Ok(()) + })?; } #[proptest(cases = 100)] @@ -159,8 +161,9 @@ mod proptests { .await .unwrap(); - assert_eq!(merge_node_1, merge_node_2); - }) + prop_assert_eq!(merge_node_1, merge_node_2); + Ok(()) + })?; } #[proptest(cases = 100)] @@ -192,7 +195,8 @@ mod proptests { .await .unwrap(); - assert_eq!(merge_node_1, merge_node_2); - }) + prop_assert_eq!(merge_node_1, merge_node_2); + Ok(()) + })?; } } diff --git a/wnfs-hamt/src/node.rs b/wnfs-hamt/src/node.rs index 453e828e..4af93444 100644 --- a/wnfs-hamt/src/node.rs +++ b/wnfs-hamt/src/node.rs @@ -1236,8 +1236,9 @@ mod proptests { node.set(key, value, store).await.unwrap(); let cid2 = node.store(store).await.unwrap(); - assert_eq!(cid1, cid2); - }) + prop_assert_eq!(cid1, cid2); + Ok(()) + })?; } #[proptest(cases = 50)] @@ -1258,8 +1259,9 @@ mod proptests { node.remove(&key, store).await.unwrap(); let cid2 = node.store(store).await.unwrap(); - assert_eq!(cid1, cid2); - }) + prop_assert_eq!(cid1, cid2); + Ok(()) + })?; } #[proptest(cases = 100)] @@ -1276,8 +1278,9 @@ mod proptests { let node_cid = node.store(store).await.unwrap(); let decoded_node = Node::::load(&node_cid, store).await.unwrap(); - assert_eq!(*node, decoded_node); - }) + prop_assert_eq!(node.as_ref(), &decoded_node); + Ok(()) + })?; } #[proptest(cases = 1000, max_shrink_iters = 10_000)] @@ -1298,8 +1301,9 @@ mod proptests { let cid1 = node1.store(store).await.unwrap(); let cid2 = node2.store(store).await.unwrap(); - assert_eq!(cid1, cid2); - }) + prop_assert_eq!(cid1, cid2); + Ok(()) + })?; } // This is sort of a "control group" for making sure that operations_and_shuffled is correct. @@ -1332,8 +1336,9 @@ mod proptests { let map = HashMap::from(&operations); let map_result = node.to_hashmap(store).await.unwrap(); - assert_eq!(map, map_result); - }) + prop_assert_eq!(map, map_result); + Ok(()) + })?; } } diff --git a/wnfs-hamt/src/pointer.rs b/wnfs-hamt/src/pointer.rs index 042589d9..21181dd2 100644 --- a/wnfs-hamt/src/pointer.rs +++ b/wnfs-hamt/src/pointer.rs @@ -227,7 +227,7 @@ where mod tests { use super::*; use testresult::TestResult; - use wnfs_common::{MemoryBlockStore, Storable}; + use wnfs_common::MemoryBlockStore; #[async_std::test] async fn pointer_can_encode_decode_as_cbor() -> TestResult { diff --git a/wnfs-nameaccumulator/src/name.rs b/wnfs-nameaccumulator/src/name.rs index 1d73bfa1..9fd1c0b8 100644 --- a/wnfs-nameaccumulator/src/name.rs +++ b/wnfs-nameaccumulator/src/name.rs @@ -818,7 +818,7 @@ mod tests { #[cfg(test)] mod snapshot_tests { use super::*; - use crate::{BigNumDig, NameSegment}; + use crate::BigNumDig; use rand_chacha::ChaCha12Rng; use rand_core::SeedableRng; use wnfs_common::utils::SnapshotBlockStore; diff --git a/wnfs-unixfs-file/src/balanced_tree.rs b/wnfs-unixfs-file/src/balanced_tree.rs index fb8bab63..5515e968 100644 --- a/wnfs-unixfs-file/src/balanced_tree.rs +++ b/wnfs-unixfs-file/src/balanced_tree.rs @@ -234,7 +234,6 @@ impl TreeNode { mod tests { use super::*; use bytes::BytesMut; - use futures::StreamExt; use wnfs_common::MemoryBlockStore; // chunks are just a single usize integer diff --git a/wnfs-unixfs-file/src/builder.rs b/wnfs-unixfs-file/src/builder.rs index d714521e..37519b3d 100644 --- a/wnfs-unixfs-file/src/builder.rs +++ b/wnfs-unixfs-file/src/builder.rs @@ -173,7 +173,6 @@ pub struct Config { mod tests { use super::*; use crate::chunker::DEFAULT_CHUNKS_SIZE; - use futures::TryStreamExt; use wnfs_common::MemoryBlockStore; #[tokio::test] diff --git a/wnfs-unixfs-file/src/protobufs.rs b/wnfs-unixfs-file/src/protobufs.rs index 6056d302..719be30a 100644 --- a/wnfs-unixfs-file/src/protobufs.rs +++ b/wnfs-unixfs-file/src/protobufs.rs @@ -91,7 +91,6 @@ pub struct Metadata { #[cfg(test)] mod tests { use super::*; - use prost::Message; use testresult::TestResult; #[test] diff --git a/wnfs/proptest-regressions/public/directory.txt b/wnfs/proptest-regressions/public/directory.txt new file mode 100644 index 00000000..5aa948a5 --- /dev/null +++ b/wnfs/proptest-regressions/public/directory.txt @@ -0,0 +1,11 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 4ca9259264a7fb240270a7f3a3c9f702cc5f3f7ec8f8add28e774546901bb064 # shrinks to input = _TestMergeDirectoryPreferredArgs { path: [] } +cc 52d317f93f0d815bfd054e6147b32492abc79b100274458f3fc266d1d9f40083 # shrinks to input = _TestMergeCommutativityArgs { ops0: [Write(["a"], "a"), Mkdir(["a"])], ops1: [] } +cc 5d512e34a6b76473ff418d6cc7730003875ae30727a3155b2abc13d5f8313b58 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {}, dirs: {} }, fs1: FileSystem { files: {["a"]: "a"}, dirs: {["a"]} } } +cc d4c4529fd972a2a6af4dcecd28a289d11451203600ae18e001dbdd42fe19e245 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["b"]: "a", ["b", "a"]: "a"}, dirs: {} }, fs1: FileSystem { files: {}, dirs: {} } } +cc e5c61f6ac3dec61974eedf0a7042fd1f801efa9f020e4b37473d5a11a7a7a7a4 # shrinks to input = _TestMergeAssociativityArgs { fs0: FileSystem { files: {}, dirs: {["e", "b"]} }, fs1: FileSystem { files: {}, dirs: {["e", "b"]} }, fs2: FileSystem { files: {}, dirs: {["e", "b"]} } } diff --git a/wnfs/proptest-regressions/public/node/node.txt b/wnfs/proptest-regressions/public/node/node.txt new file mode 100644 index 00000000..a4832080 --- /dev/null +++ b/wnfs/proptest-regressions/public/node/node.txt @@ -0,0 +1,8 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 964d01e6f0fa5c2e45a8d245bb705007a50c24fb53348cb96a528ec52e27a49a # shrinks to input = _TestTransitivityArgs { operations0: [Write(0), Write(0), Write(0), Write(0), Write(0)], operations1: [Write(0), Write(0), Write(0), Write(0)], operations2: [Write(0), Write(0), Write(15671), Fork(2303347135), Write(593116438)] } +cc 60cd267331f207e164af9c65aaf9c1232633c97d9e258dcd01d1630292820569 # shrinks to input = _TestTransitivityArgs { operations0: [], operations1: [Write(0)], operations2: [Write(0)] } diff --git a/wnfs/src/private/file.rs b/wnfs/src/private/file.rs index 7b589384..1f707b54 100644 --- a/wnfs/src/private/file.rs +++ b/wnfs/src/private/file.rs @@ -1288,6 +1288,7 @@ mod proptests { use async_std::io::Cursor; use chrono::Utc; use futures::{future, StreamExt}; + use proptest::{prop_assert, prop_assert_eq}; use rand_chacha::ChaCha12Rng; use rand_core::SeedableRng; use test_strategy::proptest; @@ -1319,8 +1320,9 @@ mod proptests { let collected_content = file.get_content(forest, store).await.unwrap(); - assert_eq!(collected_content, content); - }) + prop_assert_eq!(collected_content, content); + Ok(()) + })?; } #[proptest(cases = 10)] @@ -1352,8 +1354,9 @@ mod proptests { }) .await; - assert_eq!(collected_content, content); - }) + prop_assert_eq!(collected_content, content); + Ok(()) + })?; } #[proptest(cases = 100)] @@ -1384,8 +1387,9 @@ mod proptests { let error = error.downcast_ref::().unwrap(); - assert!(matches!(error, BlockStoreError::CIDNotFound(_))); - }) + prop_assert!(matches!(error, BlockStoreError::CIDNotFound(_))); + Ok(()) + })?; } #[proptest(cases = 10)] @@ -1425,7 +1429,8 @@ mod proptests { .await .unwrap(); - assert_eq!(source_content, wnfs_content); - }) + prop_assert_eq!(source_content, wnfs_content); + Ok(()) + })?; } } diff --git a/wnfs/src/public/directory.rs b/wnfs/src/public/directory.rs index 69bfea37..17e5a7fb 100644 --- a/wnfs/src/public/directory.rs +++ b/wnfs/src/public/directory.rs @@ -8,9 +8,13 @@ use crate::{ }; use anyhow::{bail, ensure, Result}; use async_once_cell::OnceCell; +use async_recursion::async_recursion; use chrono::{DateTime, Utc}; use libipld_core::cid::Cid; -use std::collections::{BTreeMap, BTreeSet}; +use std::{ + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap, BTreeSet}, +}; use wnfs_common::{ utils::{boxed_fut, error, Arc}, BlockStore, Metadata, NodeType, Storable, @@ -40,6 +44,21 @@ pub struct PublicDirectory { pub(crate) previous: BTreeSet, } +/// Different types of reconciliation results we can detect +#[derive(Debug, Clone)] +pub enum Reconciliation { + /// A merge was necessary, there was a conflict and we had to tie-break on given list of file paths. + /// If the list of file paths is empty, then we were able to simply merge directories together and + /// there were no destructive conflicts. + Merged { + file_tie_breaks: BTreeSet>, + }, + /// A merge wasn't necessary: We could update to the other node's state. + FastForward, + /// A merge wasn't necessary: The other node is already part of our history. + AlreadyAhead, +} + //-------------------------------------------------------------------------------------------------- // Implementations //-------------------------------------------------------------------------------------------------- @@ -143,6 +162,32 @@ impl PublicDirectory { cloned } + /// Call this function to prepare this directory for conflict reconciliation merge changes. + /// Advances this node to the next revision, unless it's already a merge node. + /// Merge nodes preferably just grow in size. This allows them to combine more nicely + /// without causing further conflicts. + pub(crate) async fn prepare_next_merge<'a>( + self: &'a mut Arc, + store: &impl BlockStore, + ) -> Result<&'a mut Self> { + if self.previous.len() > 1 { + // This is a merge node + let cloned = Arc::make_mut(self); + cloned.persisted_as = OnceCell::new(); + return Ok(cloned); + } + + // This is not a merge node. We need to force a new revision. + // Otherwise we would turn a node that is possibly storing uncommitted + // new changes into a merge node, but merge nodes should have no changes + // besides the merge itself. + let previous_cid = self.store(store).await?; + let cloned = Arc::make_mut(self); + cloned.persisted_as = OnceCell::new(); + cloned.previous = BTreeSet::from([previous_cid]); + Ok(cloned) + } + async fn get_leaf_dir<'a>( &'a self, path_segments: &[String], @@ -796,6 +841,155 @@ impl PublicDirectory { Ok(()) } + + /// Comparing the merkle clocks of this directory to the other directory + pub async fn causal_compare( + self: Arc, + other: Arc, + store: &impl BlockStore, + ) -> Result> { + let causal_order = PublicNode::Dir(self) + .causal_compare(&PublicNode::Dir(other), store) + .await?; + Ok(causal_order) + } + + /// Reconcile this node with another node. + /// + /// Use this function when you're informed about another version of this + /// public WNFS directory and want to merge any possibly new changes into + /// this directory. + /// + /// The return value can give information about what exactly happened. + /// See the documentation for the `Reconciliation` enum for more information. + pub async fn reconcile( + self: &mut Arc, + other: Arc, + store: &impl BlockStore, + ) -> Result { + let causal_order = self.clone().causal_compare(other.clone(), store).await?; + + Ok(match causal_order { + Some(Ordering::Equal) => Reconciliation::AlreadyAhead, + Some(Ordering::Greater) => Reconciliation::AlreadyAhead, + Some(Ordering::Less) => { + *self = other; + Reconciliation::FastForward + } + None => { + let file_tie_breaks = self.merge(&other, store).await?; + Reconciliation::Merged { file_tie_breaks } + } + }) + } + + /// Merge this node with given other node, ignoring whether the + /// other node is actually ahead in history or not. + /// + /// Prefer using `reconcile`, if you don't know what the difference is! + /// + /// Returns the set of file paths where tie breaks were used to resolve + /// conflicts. This means that for each path there exists a file that has been + /// overwritten with another version. + /// + /// It's possible to walk the history backwards to find which version of each + /// file has been overwritten & merge the two file versions of each file together + /// in an application-specific way and create another history entry. + pub async fn merge<'a>( + self: &'a mut Arc, + other: &'a Arc, + store: &'a impl BlockStore, + ) -> Result>> { + let mut file_tie_breaks = BTreeSet::new(); + self.merge_helper(other, store, &[], &mut file_tie_breaks) + .await?; + Ok(file_tie_breaks) + } + + #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] + #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] + async fn merge_helper<'a>( + self: &'a mut Arc, + other: &'a Arc, + store: &'a impl BlockStore, + current_path: &[String], + file_tie_breaks: &mut BTreeSet>, + ) -> Result<()> { + let our_cid = self.store(store).await?; + let other_cid = other.store(store).await?; + if our_cid == other_cid { + // We don't have to merge + return Ok(()); + } + + let dir = self.prepare_next_merge(store).await?; + if other.previous.len() > 1 { + // The other node is a merge node, we should merge the merge nodes directly: + dir.previous.extend(other.previous.iter().cloned()); + } else { + // The other node is a 'normal' node - we need to merge it normally + dir.previous.insert(other.store(store).await?); + } + dir.metadata.tie_break_with(&other.metadata)?; + + for (name, link) in other.userland.iter() { + let other_node = link.resolve_value(store).await?; + match dir.userland.entry(name.clone()) { + Entry::Vacant(vacant) => { + vacant.insert(PublicLink::new(other_node.clone())); + } + Entry::Occupied(mut occupied) => { + let our_node = occupied.get_mut().resolve_value_mut(store).await?; + match (our_node, other_node) { + (PublicNode::File(our_file), PublicNode::File(other_file)) => { + let our_cid = our_file.store(store).await?; + let other_cid = other_file.store(store).await?; + if our_cid == other_cid { + continue; // No need to merge, the files are equal + } + + let mut path = current_path.to_vec(); + path.push(name.clone()); + file_tie_breaks.insert(path); + + let our_content_cid = our_file.userland.resolve_cid(store).await?; + let other_content_cid = other_file.userland.resolve_cid(store).await?; + + let file = our_file.prepare_next_merge(store).await?; + if other_file.previous.len() > 1 { + // The other node is a merge node, we should merge the merge nodes directly: + file.previous.extend(other_file.previous.iter().cloned()); + } else { + // The other node is a 'normal' node - we need to merge it normally + file.previous.insert(other_file.store(store).await?); + } + + if our_content_cid.hash().digest() > other_content_cid.hash().digest() { + file.userland = other_file.userland.clone(); + file.metadata = other_file.metadata.clone(); + } + } + (node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => { + // directories have priority + // we don't add previous links + *node = PublicNode::Dir(other_dir.clone()); + } + (PublicNode::Dir(_), PublicNode::File(_)) => { + // directories have priority, no changes necessary + } + (PublicNode::Dir(dir), PublicNode::Dir(other_dir)) => { + let mut path = current_path.to_vec(); + path.push(name.clone()); + dir.merge_helper(other_dir, store, &path, file_tie_breaks) + .await?; + } + } + } + } + } + + Ok(()) + } } impl Id for PublicDirectory { @@ -1280,6 +1474,190 @@ mod tests { } } +#[cfg(test)] +mod proptests { + use super::*; + use proptest::{ + collection::{btree_map, btree_set, vec}, + prelude::*, + }; + use test_strategy::proptest; + use wnfs_common::MemoryBlockStore; + + #[derive(Debug, Clone)] + struct FileSystem { + files: BTreeMap, String>, + dirs: BTreeSet>, + } + + fn file_system() -> impl Strategy { + ( + btree_map(vec(simple_string(), 1..10), simple_string(), 0..40), + btree_set(vec(simple_string(), 1..10), 0..40), + ) + .prop_map(|(mut files, dirs)| { + files = files + .into_iter() + .filter(|(file_path, _)| { + !dirs + .iter() + .any(|dir_path| !dir_path.starts_with(&file_path)) + }) + .collect(); + FileSystem { files, dirs } + }) + .prop_filter("file overwritten by directory", valid_fs) + } + + fn simple_string() -> impl Strategy { + (0..6u32).prop_map(|c| char::from_u32('a' as u32 + c).unwrap().to_string()) + } + + fn valid_fs(fs: &FileSystem) -> bool { + fs.files.iter().all(|(file_path, _)| { + !fs.dirs + .iter() + .any(|dir_path| dir_path.starts_with(&file_path)) + && !fs + .files + .iter() + .any(|(other_path, _)| other_path.starts_with(&file_path)) + }) + } + + async fn convert_fs( + fs: FileSystem, + time: DateTime, + store: &impl BlockStore, + ) -> Result> { + let mut dir = PublicDirectory::new_rc(time); + let FileSystem { files, dirs } = fs; + for (path, content) in files.iter() { + dir.write(&path, content.clone().into_bytes(), time, store) + .await?; + } + + for path in dirs.iter() { + dir.mkdir(&path, time, store).await?; + } + + Ok(dir) + } + + #[proptest] + fn test_merge_directory_preferred(#[strategy(vec(simple_string(), 1..10))] path: Vec) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let time = Utc::now(); + + let root0 = &mut PublicDirectory::new_rc(time); + let root1 = &mut PublicDirectory::new_rc(time); + + root0 + .write(&path, b"Should be overwritten".into(), time, store) + .await + .unwrap(); + + root1.mkdir(&path, time, store).await.unwrap(); + + root0.merge(root1, store).await.unwrap(); + + let node = root0 + .get_node(&path, store) + .await + .unwrap() + .expect("merged fs contains the node"); + + prop_assert!(node.is_dir()); + + Ok(()) + })?; + } + + #[proptest] + fn test_merge_commutativity( + #[strategy(file_system())] fs0: FileSystem, + #[strategy(file_system())] fs1: FileSystem, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let time = Utc::now(); + + let root0 = convert_fs(fs0, time, store).await.unwrap(); + let root1 = convert_fs(fs1, time, store).await.unwrap(); + + let mut merge_one_way = Arc::clone(&root0); + merge_one_way.merge(&root1, store).await.unwrap(); + let mut merge_other_way = Arc::clone(&root1); + merge_other_way.merge(&root0, store).await.unwrap(); + + let cid_one_way = merge_one_way.store(store).await.unwrap(); + let cid_other_way = merge_other_way.store(store).await.unwrap(); + + prop_assert_eq!(cid_one_way, cid_other_way); + + Ok(()) + })?; + } + + #[proptest] + fn test_merge_associativity( + #[strategy(file_system())] fs0: FileSystem, + #[strategy(file_system())] fs1: FileSystem, + #[strategy(file_system())] fs2: FileSystem, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let time = Utc::now(); + let root0 = convert_fs(fs0, time, store).await.unwrap(); + let root1 = convert_fs(fs1, time, store).await.unwrap(); + let root2 = convert_fs(fs2, time, store).await.unwrap(); + + let mut merge_0_1_then_2 = Arc::clone(&root0); + merge_0_1_then_2.merge(&root1, store).await.unwrap(); + merge_0_1_then_2.merge(&root2, store).await.unwrap(); + + let mut merge_1_2 = Arc::clone(&root1); + merge_1_2.merge(&root2, store).await.unwrap(); + let mut merge_0_with_1_2 = Arc::clone(&root0); + merge_0_with_1_2.merge(&merge_1_2, store).await.unwrap(); + + let cid_one_way = merge_0_1_then_2.store(store).await.unwrap(); + let cid_other_way = merge_0_with_1_2.store(store).await.unwrap(); + + prop_assert_eq!(cid_one_way, cid_other_way); + + Ok(()) + })?; + } + + #[proptest] + fn test_merge_directories_preserved( + #[strategy(file_system())] fs0: FileSystem, + #[strategy(file_system())] fs1: FileSystem, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let time = Utc::now(); + + let mut all_dirs = fs0.dirs.clone(); + all_dirs.extend(fs1.dirs.iter().cloned()); + + let mut root = convert_fs(fs0, time, store).await.unwrap(); + let root1 = convert_fs(fs1, time, store).await.unwrap(); + + root.merge(&root1, store).await.unwrap(); + + for dir in all_dirs { + let exists = root.get_node(&dir, store).await.unwrap().is_some(); + prop_assert!(exists); + } + + Ok(()) + })?; + } +} + #[cfg(test)] mod snapshot_tests { use super::*; diff --git a/wnfs/src/public/file.rs b/wnfs/src/public/file.rs index ed4a3e6e..cdfcada4 100644 --- a/wnfs/src/public/file.rs +++ b/wnfs/src/public/file.rs @@ -405,6 +405,32 @@ impl PublicFile { cloned } + /// Call this function to prepare this directory for conflict reconciliation merge changes. + /// Advances this node to the next revision, unless it's already a merge node. + /// Merge nodes preferably just grow in size. This allows them to combine more nicely + /// without causing further conflicts. + pub(crate) async fn prepare_next_merge<'a>( + self: &'a mut Arc, + store: &impl BlockStore, + ) -> Result<&'a mut Self> { + if self.previous.len() > 1 { + // This is a merge node + let cloned = Arc::make_mut(self); + cloned.persisted_as = OnceCell::new(); + return Ok(cloned); + } + + // This is not a merge node. We need to force a new revision. + // Otherwise we would turn a node that is possibly storing uncommitted + // new changes into a merge node, but merge nodes should have no changes + // besides the merge itself. + let previous_cid = self.store(store).await?; + let cloned = Arc::make_mut(self); + cloned.persisted_as = OnceCell::new(); + cloned.previous = BTreeSet::from([previous_cid]); + Ok(cloned) + } + /// Writes a new content cid to the file. /// This will create a new revision of the file. pub async fn set_content( diff --git a/wnfs/src/public/node/node.rs b/wnfs/src/public/node/node.rs index fb0bc4e1..9e98ed42 100644 --- a/wnfs/src/public/node/node.rs +++ b/wnfs/src/public/node/node.rs @@ -10,7 +10,7 @@ use anyhow::{bail, Result}; use async_once_cell::OnceCell; use chrono::{DateTime, Utc}; use libipld_core::cid::Cid; -use std::collections::BTreeSet; +use std::{cmp::Ordering, collections::BTreeSet}; use wnfs_common::{utils::Arc, BlockStore, Storable}; //-------------------------------------------------------------------------------------------------- @@ -229,6 +229,84 @@ impl PublicNode { pub fn is_file(&self) -> bool { matches!(self, Self::File(_)) } + + /// Comparing the merkle clocks of this node to the other node. + /// + /// This gives you information about which node is "ahead" of which other node + /// (think similar to git). + /// This is what the return types indicate: + /// - `Ok(None)`: These two nodes don't share any history. + /// - `Ok(Some(Ordering::Equal))`: These nodes represent the same point in history/are the same exact node. + /// - `Ok(Some(Ordering::Less))`: The other node is "further ahead" in history than this node. + /// - `Ok(Some(Ordering::Greater))`: This node is "further ahead". + /// - `Err(_)`: Something went wrong during deserialization/in the blockstore. + pub async fn causal_compare( + &self, + other: &Self, + store: &impl BlockStore, + ) -> Result> { + async fn next_previous_set( + previous_set: BTreeSet, + visited_cids: &mut BTreeSet, + store: &impl BlockStore, + ) -> Result> { + let mut previous = BTreeSet::new(); + + for cid in previous_set { + let node = PublicNode::load(&cid, store).await?; + previous.extend( + node.get_previous() + .iter() + .filter(|cid| visited_cids.insert(**cid)) + .cloned(), + ); + } + + Ok(previous) + } + + let our_root = self.store(store).await?; + let other_root = other.store(store).await?; + + if our_root == other_root { + return Ok(Some(Ordering::Equal)); + } + + let mut our_previous_set = self.get_previous().clone(); + let mut other_previous_set = other.get_previous().clone(); + + let mut our_visited = BTreeSet::new(); + let mut other_visited = BTreeSet::new(); + + loop { + if other_previous_set.contains(&our_root) { + return Ok(Some(Ordering::Less)); + } + + if our_previous_set.contains(&other_root) { + return Ok(Some(Ordering::Greater)); + } + + // early return optimization: + // If one "previous CIDs frontier" is entirely within the other's visited set, + // then it for sure can't hit the other root, so we know they diverged. + let our_is_true_subset = + !our_previous_set.is_empty() && our_previous_set.is_subset(&other_visited); + let other_is_true_subset = + !other_previous_set.is_empty() && other_previous_set.is_subset(&our_visited); + if our_is_true_subset || other_is_true_subset { + return Ok(None); + } + + our_previous_set = next_previous_set(our_previous_set, &mut our_visited, store).await?; + other_previous_set = + next_previous_set(other_previous_set, &mut other_visited, store).await?; + + if our_previous_set.is_empty() && other_previous_set.is_empty() { + return Ok(None); // No common causal history + } + } + } } impl Id for PublicNode { @@ -332,6 +410,261 @@ mod tests { } } +#[cfg(test)] +mod proptests { + use super::*; + use chrono::NaiveDateTime; + use futures::{stream, StreamExt, TryStreamExt}; + use proptest::{collection::vec, prelude::*}; + use test_strategy::proptest; + use wnfs_common::MemoryBlockStore; + + #[derive(Debug, Clone, Copy)] + enum Operation { + Write(usize), // write to nth head + Merge, // merge all heads + Fork(usize), // fork the nth head + } + + #[derive(Debug, Clone)] + struct State { + heads: Vec>, // always nonempty + fork_num: i64, + } + + impl State { + pub fn new(init_time: i64) -> Self { + Self { + heads: vec![Arc::new(PublicDirectory::new(Self::time(init_time)))], + fork_num: 0, + } + } + + fn time(n: i64) -> DateTime { + DateTime::from_naive_utc_and_offset( + // convert into seconds, otherwise 0 and 1 would both be mapped to "0 seconds" + NaiveDateTime::from_timestamp_millis(n * 1000).unwrap(), + Utc, + ) + } + + pub fn get_head(&self, n: usize) -> &Arc { + let len = self.heads.len(); + debug_assert!(len > 0); + &self.heads[n % len] // so we don't need to account for the current state (number of heads) when generating n + } + + pub fn get_head_mut(&mut self, n: usize) -> &mut Arc { + let len = self.heads.len(); + debug_assert!(len > 0); + &mut self.heads[n % len] // so we don't need to account for the current state (number of heads) when generating n + } + + pub async fn run(&mut self, op: &Operation, store: &impl BlockStore) -> Result<()> { + match op { + Operation::Write(n) => { + let head = self.get_head_mut(*n); + head.store(store).await?; + head.prepare_next_revision(); + } + Operation::Merge => { + let head_cids = stream::iter(self.heads.iter()) + .then(|head| head.store(store)) + .try_collect::>() + .await?; + let mut dir = PublicDirectory::new(Self::time(0)); + dir.previous = head_cids; + self.heads = vec![Arc::new(dir)]; + } + Operation::Fork(n) => { + let mut head = (**self.get_head(*n)).clone(); + self.fork_num += 1; + // To make sure we don't accidentally recreate the same CIDs + head.metadata.upsert_mtime(Self::time(self.fork_num)); + self.heads.push(Arc::new(head)); + } + } + Ok(()) + } + + pub async fn run_all( + &mut self, + ops: impl IntoIterator, + store: &impl BlockStore, + ) -> Result<()> { + for op in ops { + self.run(&op, store).await?; + } + Ok(()) + } + + pub fn head_node(&self) -> PublicNode { + debug_assert!(!self.heads.is_empty()); + PublicNode::Dir(Arc::clone(&self.heads[0])) + } + } + + fn op() -> impl Strategy { + (0..=2, 0..16).prop_map(|(op, idx)| match op { + 0 => Operation::Write(idx as usize), + 1 => Operation::Merge, + 2 => Operation::Fork(idx as usize), + _ => unreachable!( + "This case should be impossible. Values generated are only 0, 1, and 2" + ), + }) + } + + async fn run_ops( + init_time: i64, + operations: impl IntoIterator, + store: &impl BlockStore, + ) -> Result { + let mut state = State::new(init_time); + state.run_all(operations, store).await?; + Ok(state.head_node()) + } + + #[proptest] + fn test_reflexivity(#[strategy(vec(op(), 0..100))] operations: Vec) { + async_std::task::block_on(async move { + let mut state = State::new(0); + let store = &MemoryBlockStore::new(); + + state.run_all(operations, store).await.unwrap(); + let head_one = state.head_node(); + let head_two = state.head_node(); + + prop_assert_eq!( + head_one.causal_compare(&head_two, store).await.unwrap(), + Some(Ordering::Equal) + ); + + Ok(()) + })?; + } + + #[proptest(cases = 256, max_global_rejects = 10_000)] + fn test_asymmetry( + #[strategy(vec(op(), 0..30))] operations_one: Vec, + #[strategy(vec(op(), 0..30))] operations_two: Vec, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let node_one = run_ops(0, operations_one, store).await.unwrap(); + let node_two = run_ops(0, operations_two, store).await.unwrap(); + + let Some(cmp) = node_one.causal_compare(&node_two, store).await.unwrap() else { + return Err(TestCaseError::reject("not testing causally incomparable")); + }; + + let Some(cmp_rev) = node_two.causal_compare(&node_one, store).await.unwrap() else { + return Err(TestCaseError::fail( + "causally comparable one way, but not the other", + )); + }; + + prop_assert_eq!(cmp.reverse(), cmp_rev); + + Ok(()) + })?; + } + + #[proptest(cases = 100, max_global_rejects = 10_000)] + fn test_transitivity( + #[strategy(vec(op(), 0..20))] operations0: Vec, + #[strategy(vec(op(), 0..20))] operations1: Vec, + #[strategy(vec(op(), 0..20))] operations2: Vec, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let node0 = run_ops(0, operations0, store).await.unwrap(); + let node1 = run_ops(0, operations1, store).await.unwrap(); + let node2 = run_ops(0, operations2, store).await.unwrap(); + + let Some(cmp_0_1) = node0.causal_compare(&node1, store).await.unwrap() else { + return Err(TestCaseError::reject("not testing causally incomparable")); + }; + + let Some(cmp_1_2) = node1.causal_compare(&node2, store).await.unwrap() else { + return Err(TestCaseError::reject("not testing causally incomparable")); + }; + + let Some(cmp_0_2) = node0.causal_compare(&node2, store).await.unwrap() else { + return Err(TestCaseError::reject("not testing causally incomparable")); + }; + + match (cmp_0_1, cmp_1_2) { + (Ordering::Equal, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Equal), + (Ordering::Less, Ordering::Less) => prop_assert_eq!(cmp_0_2, Ordering::Less), + (Ordering::Less, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Less), + (Ordering::Equal, Ordering::Less) => prop_assert_eq!(cmp_0_2, Ordering::Less), + (Ordering::Equal, Ordering::Greater) => prop_assert_eq!(cmp_0_2, Ordering::Greater), + (Ordering::Greater, Ordering::Equal) => prop_assert_eq!(cmp_0_2, Ordering::Greater), + (Ordering::Greater, Ordering::Greater) => { + prop_assert_eq!(cmp_0_2, Ordering::Greater) + } + (Ordering::Less, Ordering::Greater) => { + return Err(TestCaseError::reject( + "a < b and b > c, there's no transitivity to test here", + )) + } + (Ordering::Greater, Ordering::Less) => { + return Err(TestCaseError::reject( + "a > b and b < c, there's no transitivity to test here", + )) + } + } + + Ok(()) + })?; + } + + #[proptest] + fn test_different_roots_incomparable( + #[strategy(vec(op(), 0..100))] operations0: Vec, + #[strategy(vec(op(), 0..100))] operations1: Vec, + ) { + async_std::task::block_on(async move { + let store = &MemoryBlockStore::new(); + let node0 = run_ops(0, operations0, store).await.unwrap(); + let node1 = run_ops(1, operations1, store).await.unwrap(); + + prop_assert_eq!(node0.causal_compare(&node1, store).await.unwrap(), None); + prop_assert_eq!(node1.causal_compare(&node0, store).await.unwrap(), None); + Ok(()) + })?; + } + + #[proptest] + fn test_ops_after_merge_makes_greater( + #[strategy(vec(op(), 0..100))] operations: Vec, + #[strategy(vec(op(), 0..100))] more_ops: Vec, + ) { + async_std::task::block_on(async move { + let mut state = State::new(0); + let store = &MemoryBlockStore::new(); + + state.run_all(operations, store).await.unwrap(); + let head_one = state.head_node(); + state.run(&Operation::Merge, store).await.unwrap(); + state.run_all(more_ops, store).await.unwrap(); + let head_two = state.head_node(); + + prop_assert_eq!( + head_one.causal_compare(&head_two, store).await.unwrap(), + Some(Ordering::Less) + ); + prop_assert_eq!( + head_two.causal_compare(&head_one, store).await.unwrap(), + Some(Ordering::Greater) + ); + + Ok(()) + })?; + } +} + #[cfg(test)] mod snapshot_tests { use super::*;