Skip to content

Commit

Permalink
feat: implement PublicDirectory conflict reconciliation (#426)
Browse files Browse the repository at this point in the history
This implements three functions:
- `PublicDirectory::reconcile` which takes two directories & reconciles all changes between them. Think of this as similar to `get pull`, but where conflicts in files are merged automatically via a simple tie-breaking mechanism on the file hash. The return value indicates whether and which files were affected by the automatic merge, if at all.
- `PublicDirectory::merge`, the underlying function for `reconcile`, but which doesn't take into account if one of the directories may have been "ahead" in history. Use only if you know what you're doing, otherwise opt for `reconcile`.
- `PublicNode::causal_compare`, the underlying function in `reconcile` that figures out whether one version of a node is "ahead" of another or behind, or if they're two conflicting versions and need to be merged.

---

* feat: Conflict reconciliation for PublicDirectory (first draft)

* fix: Use `async_recursion`

* refactor: Use more `prop_assert`

* chore: Write proptests for `causal_compare`

* chore: Write tests for `PublicDirectory::merge`

* chore: Write documentation

* fix: Consistently merge metadata between nodes

* fix: Test types

* chore: Remove unused imports

* fix: Bugs in merge implementation

Specifically:
- trivially merge exactly equal nodes without creating a history entry
- correctly reset `persisted_as` when creating a merge node
- always advance the history entry when creating a merge node

* fix: Don't merge equal files
  • Loading branch information
matheus23 authored Apr 18, 2024
1 parent 4201130 commit dc0785a
Show file tree
Hide file tree
Showing 15 changed files with 836 additions and 48 deletions.
21 changes: 20 additions & 1 deletion wnfs-common/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
use anyhow::{bail, Result};
use chrono::{DateTime, TimeZone, Utc};
use libipld::Ipld;
use libipld::{Ipld, Multihash};
use multihash::{Code, MultihashDigest};
use serde::{
de::{DeserializeOwned, Error as DeError},
Deserialize, Deserializer, Serialize, Serializer,
Expand Down Expand Up @@ -223,6 +224,24 @@ impl Metadata {
self.0.insert(key.clone(), value.clone());
}
}

pub(crate) fn hash(&self) -> Result<Multihash> {
let vec = serde_ipld_dagcbor::to_vec(self)?;
let hash = Code::Blake3_256.digest(&vec);
Ok(hash)
}

/// Tie break this node with another one.
/// Used for conflict reconciliation. We don't merge the two metadata maps
/// together (yet), instead we compare their hashes. The one with the lower hash
/// survives.
pub fn tie_break_with(&mut self, other: &Self) -> Result<()> {
if self.hash()?.digest() > other.hash()?.digest() {
self.0 = other.0.clone();
}

Ok(())
}
}

impl TryFrom<&Ipld> for NodeType {
Expand Down
34 changes: 18 additions & 16 deletions wnfs-hamt/src/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ mod proptests {
ChangeType,
};
use async_std::task;
use proptest::{prop_assert, prop_assert_eq};
use std::collections::HashSet;
use test_strategy::proptest;
use wnfs_common::{utils::Arc, Link, MemoryBlockStore};
Expand Down Expand Up @@ -638,19 +639,17 @@ mod proptests {
.await
.unwrap();

assert_eq!(strategy_changes.len(), changes.len());
prop_assert_eq!(strategy_changes.len(), changes.len());
for strategy_change in strategy_changes {
assert!(changes.iter().any(|c| match &strategy_change {
let found_matching_change = changes.iter().any(|c| match &strategy_change {
Change::Add(k, _) => c.r#type == ChangeType::Add && &c.key == k,
Change::Modify(k, _) => {
c.r#type == ChangeType::Modify && &c.key == k
}
Change::Remove(k) => {
c.r#type == ChangeType::Remove && &c.key == k
}
}));
Change::Modify(k, _) => c.r#type == ChangeType::Modify && &c.key == k,
Change::Remove(k) => c.r#type == ChangeType::Remove && &c.key == k,
});
prop_assert!(found_matching_change);
}
});
Ok(())
})?;
}

#[proptest(cases = 1000, max_shrink_iters = 40000)]
Expand All @@ -673,8 +672,9 @@ mod proptests {
.map(|c| c.key.clone())
.collect::<HashSet<_>>();

assert_eq!(change_set.len(), changes.len());
});
prop_assert_eq!(change_set.len(), changes.len());
Ok(())
})?;
}

#[proptest(cases = 100)]
Expand All @@ -700,14 +700,16 @@ mod proptests {
.await
.unwrap();

assert_eq!(changes.len(), flipped_changes.len());
prop_assert_eq!(changes.len(), flipped_changes.len());
for change in changes {
assert!(flipped_changes.iter().any(|c| match change.r#type {
let found_matching_change = flipped_changes.iter().any(|c| match change.r#type {
ChangeType::Add => c.r#type == ChangeType::Remove && c.key == change.key,
ChangeType::Remove => c.r#type == ChangeType::Add && c.key == change.key,
ChangeType::Modify => c.r#type == ChangeType::Modify && c.key == change.key,
}));
});
prop_assert!(found_matching_change);
}
});
Ok(())
})?;
}
}
16 changes: 10 additions & 6 deletions wnfs-hamt/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ where
mod proptests {
use crate::strategies::{self, generate_kvs};
use async_std::task;
use proptest::prop_assert_eq;
use std::cmp;
use test_strategy::proptest;
use wnfs_common::{utils::Arc, Link, MemoryBlockStore};
Expand Down Expand Up @@ -126,8 +127,9 @@ mod proptests {
.unwrap()
};

assert_eq!(merge_node_left_assoc, merge_node_right_assoc);
});
prop_assert_eq!(merge_node_left_assoc, merge_node_right_assoc);
Ok(())
})?;
}

#[proptest(cases = 100)]
Expand Down Expand Up @@ -159,8 +161,9 @@ mod proptests {
.await
.unwrap();

assert_eq!(merge_node_1, merge_node_2);
})
prop_assert_eq!(merge_node_1, merge_node_2);
Ok(())
})?;
}

#[proptest(cases = 100)]
Expand Down Expand Up @@ -192,7 +195,8 @@ mod proptests {
.await
.unwrap();

assert_eq!(merge_node_1, merge_node_2);
})
prop_assert_eq!(merge_node_1, merge_node_2);
Ok(())
})?;
}
}
25 changes: 15 additions & 10 deletions wnfs-hamt/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,8 +1236,9 @@ mod proptests {
node.set(key, value, store).await.unwrap();
let cid2 = node.store(store).await.unwrap();

assert_eq!(cid1, cid2);
})
prop_assert_eq!(cid1, cid2);
Ok(())
})?;
}

#[proptest(cases = 50)]
Expand All @@ -1258,8 +1259,9 @@ mod proptests {
node.remove(&key, store).await.unwrap();
let cid2 = node.store(store).await.unwrap();

assert_eq!(cid1, cid2);
})
prop_assert_eq!(cid1, cid2);
Ok(())
})?;
}

#[proptest(cases = 100)]
Expand All @@ -1276,8 +1278,9 @@ mod proptests {
let node_cid = node.store(store).await.unwrap();
let decoded_node = Node::<String, u64>::load(&node_cid, store).await.unwrap();

assert_eq!(*node, decoded_node);
})
prop_assert_eq!(node.as_ref(), &decoded_node);
Ok(())
})?;
}

#[proptest(cases = 1000, max_shrink_iters = 10_000)]
Expand All @@ -1298,8 +1301,9 @@ mod proptests {
let cid1 = node1.store(store).await.unwrap();
let cid2 = node2.store(store).await.unwrap();

assert_eq!(cid1, cid2);
})
prop_assert_eq!(cid1, cid2);
Ok(())
})?;
}

// This is sort of a "control group" for making sure that operations_and_shuffled is correct.
Expand Down Expand Up @@ -1332,8 +1336,9 @@ mod proptests {
let map = HashMap::from(&operations);
let map_result = node.to_hashmap(store).await.unwrap();

assert_eq!(map, map_result);
})
prop_assert_eq!(map, map_result);
Ok(())
})?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion wnfs-hamt/src/pointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ where
mod tests {
use super::*;
use testresult::TestResult;
use wnfs_common::{MemoryBlockStore, Storable};
use wnfs_common::MemoryBlockStore;

#[async_std::test]
async fn pointer_can_encode_decode_as_cbor() -> TestResult {
Expand Down
2 changes: 1 addition & 1 deletion wnfs-nameaccumulator/src/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ mod tests {
#[cfg(test)]
mod snapshot_tests {
use super::*;
use crate::{BigNumDig, NameSegment};
use crate::BigNumDig;
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use wnfs_common::utils::SnapshotBlockStore;
Expand Down
1 change: 0 additions & 1 deletion wnfs-unixfs-file/src/balanced_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ impl TreeNode {
mod tests {
use super::*;
use bytes::BytesMut;
use futures::StreamExt;
use wnfs_common::MemoryBlockStore;

// chunks are just a single usize integer
Expand Down
1 change: 0 additions & 1 deletion wnfs-unixfs-file/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ pub struct Config {
mod tests {
use super::*;
use crate::chunker::DEFAULT_CHUNKS_SIZE;
use futures::TryStreamExt;
use wnfs_common::MemoryBlockStore;

#[tokio::test]
Expand Down
1 change: 0 additions & 1 deletion wnfs-unixfs-file/src/protobufs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub struct Metadata {
#[cfg(test)]
mod tests {
use super::*;
use prost::Message;
use testresult::TestResult;

#[test]
Expand Down
11 changes: 11 additions & 0 deletions wnfs/proptest-regressions/public/directory.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 4ca9259264a7fb240270a7f3a3c9f702cc5f3f7ec8f8add28e774546901bb064 # shrinks to input = _TestMergeDirectoryPreferredArgs { path: [] }
cc 52d317f93f0d815bfd054e6147b32492abc79b100274458f3fc266d1d9f40083 # shrinks to input = _TestMergeCommutativityArgs { ops0: [Write(["a"], "a"), Mkdir(["a"])], ops1: [] }
cc 5d512e34a6b76473ff418d6cc7730003875ae30727a3155b2abc13d5f8313b58 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {}, dirs: {} }, fs1: FileSystem { files: {["a"]: "a"}, dirs: {["a"]} } }
cc d4c4529fd972a2a6af4dcecd28a289d11451203600ae18e001dbdd42fe19e245 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["b"]: "a", ["b", "a"]: "a"}, dirs: {} }, fs1: FileSystem { files: {}, dirs: {} } }
cc e5c61f6ac3dec61974eedf0a7042fd1f801efa9f020e4b37473d5a11a7a7a7a4 # shrinks to input = _TestMergeAssociativityArgs { fs0: FileSystem { files: {}, dirs: {["e", "b"]} }, fs1: FileSystem { files: {}, dirs: {["e", "b"]} }, fs2: FileSystem { files: {}, dirs: {["e", "b"]} } }
8 changes: 8 additions & 0 deletions wnfs/proptest-regressions/public/node/node.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 964d01e6f0fa5c2e45a8d245bb705007a50c24fb53348cb96a528ec52e27a49a # shrinks to input = _TestTransitivityArgs { operations0: [Write(0), Write(0), Write(0), Write(0), Write(0)], operations1: [Write(0), Write(0), Write(0), Write(0)], operations2: [Write(0), Write(0), Write(15671), Fork(2303347135), Write(593116438)] }
cc 60cd267331f207e164af9c65aaf9c1232633c97d9e258dcd01d1630292820569 # shrinks to input = _TestTransitivityArgs { operations0: [], operations1: [Write(0)], operations2: [Write(0)] }
21 changes: 13 additions & 8 deletions wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ mod proptests {
use async_std::io::Cursor;
use chrono::Utc;
use futures::{future, StreamExt};
use proptest::{prop_assert, prop_assert_eq};
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use test_strategy::proptest;
Expand Down Expand Up @@ -1319,8 +1320,9 @@ mod proptests {

let collected_content = file.get_content(forest, store).await.unwrap();

assert_eq!(collected_content, content);
})
prop_assert_eq!(collected_content, content);
Ok(())
})?;
}

#[proptest(cases = 10)]
Expand Down Expand Up @@ -1352,8 +1354,9 @@ mod proptests {
})
.await;

assert_eq!(collected_content, content);
})
prop_assert_eq!(collected_content, content);
Ok(())
})?;
}

#[proptest(cases = 100)]
Expand Down Expand Up @@ -1384,8 +1387,9 @@ mod proptests {

let error = error.downcast_ref::<BlockStoreError>().unwrap();

assert!(matches!(error, BlockStoreError::CIDNotFound(_)));
})
prop_assert!(matches!(error, BlockStoreError::CIDNotFound(_)));
Ok(())
})?;
}

#[proptest(cases = 10)]
Expand Down Expand Up @@ -1425,7 +1429,8 @@ mod proptests {
.await
.unwrap();

assert_eq!(source_content, wnfs_content);
})
prop_assert_eq!(source_content, wnfs_content);
Ok(())
})?;
}
}
Loading

0 comments on commit dc0785a

Please sign in to comment.