From 4a499bf5d857b9365f2bb43ee3c1e7a20cc5e46f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 17 Apr 2024 20:06:27 +0200 Subject: [PATCH] fix: Consistently merge metadata between nodes --- wnfs-common/src/metadata.rs | 21 +++++++++++++++++- wnfs/src/public/directory.rs | 41 +++++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/wnfs-common/src/metadata.rs b/wnfs-common/src/metadata.rs index 02632cdf..bd12e05b 100644 --- a/wnfs-common/src/metadata.rs +++ b/wnfs-common/src/metadata.rs @@ -2,7 +2,8 @@ use anyhow::{bail, Result}; use chrono::{DateTime, TimeZone, Utc}; -use libipld::Ipld; +use libipld::{Ipld, Multihash}; +use multihash::{Code, MultihashDigest}; use serde::{ de::{DeserializeOwned, Error as DeError}, Deserialize, Deserializer, Serialize, Serializer, @@ -223,6 +224,24 @@ impl Metadata { self.0.insert(key.clone(), value.clone()); } } + + pub(crate) fn hash(&self) -> Result { + let vec = serde_ipld_dagcbor::to_vec(self)?; + let hash = Code::Blake3_256.digest(&vec); + Ok(hash) + } + + /// Tie break this node with another one. + /// Used for conflict reconciliation. We don't merge the two metadata maps + /// together (yet), instead we compare their hashes. The one with the lower hash + /// survives. + pub fn tie_break_with(&mut self, other: &Self) -> Result<()> { + if self.hash()?.digest() > other.hash()?.digest() { + self.0 = other.0.clone(); + } + + Ok(()) + } } impl TryFrom<&Ipld> for NodeType { diff --git a/wnfs/src/public/directory.rs b/wnfs/src/public/directory.rs index ecab1b50..07f62ea2 100644 --- a/wnfs/src/public/directory.rs +++ b/wnfs/src/public/directory.rs @@ -852,7 +852,6 @@ impl PublicDirectory { pub async fn reconcile( self: &mut Arc, other: Arc, - time: DateTime, store: &impl BlockStore, ) -> Result { let causal_order = self.clone().causal_compare(other.clone(), store).await?; @@ -865,7 +864,7 @@ impl PublicDirectory { Reconciliation::FastForward } None => { - let file_tie_breaks = self.merge(&other, time, store).await?; + let file_tie_breaks = self.merge(&other, store).await?; Reconciliation::Merged { file_tie_breaks } } }) @@ -886,11 +885,10 @@ impl PublicDirectory { pub async fn merge<'a>( self: &'a mut Arc, other: &'a Arc, - time: DateTime, store: &'a impl BlockStore, ) -> Result>> { let mut file_tie_breaks = BTreeSet::new(); - self.merge_helper(other, time, store, &[], &mut file_tie_breaks) + self.merge_helper(other, store, &[], &mut file_tie_breaks) .await?; Ok(file_tie_breaks) } @@ -900,13 +898,19 @@ impl PublicDirectory { async fn merge_helper<'a>( self: &'a mut Arc, other: &'a Arc, - time: DateTime, store: &'a impl BlockStore, current_path: &[String], file_tie_breaks: &mut BTreeSet>, ) -> Result<()> { let dir = self.prepare_next_merge(); - dir.metadata.upsert_mtime(time); + if other.previous.len() > 1 { + // The other node is a merge node, we should merge the merge nodes directly: + dir.previous.extend(other.previous.iter().cloned()); + } else { + // The other node is a 'normal' node - we need to merge it normally + dir.previous.insert(other.store(store).await?); + } + dir.metadata.tie_break_with(&other.metadata)?; for (name, link) in other.userland.iter() { let other_node = link.resolve_value(store).await?; @@ -915,25 +919,28 @@ impl PublicDirectory { vacant.insert(PublicLink::new(other_node.clone())); } Entry::Occupied(mut occupied) => { - match ( - occupied.get_mut().resolve_value_mut(store).await?, - other_node, - ) { + let our_node = occupied.get_mut().resolve_value_mut(store).await?; + match (our_node, other_node) { (PublicNode::File(our_file), PublicNode::File(other_file)) => { let mut path = current_path.to_vec(); path.push(name.clone()); file_tie_breaks.insert(path); - let file = our_file.perpare_next_merge(); - file.metadata.upsert_mtime(time); - - let our_cid = file.store(store).await?; - let other_cid = other_file.store(store).await?; + let our_cid = our_file.userland.resolve_cid(store).await?; + let other_cid = other_file.userland.resolve_cid(store).await?; - file.previous.insert(other_cid); + let file = our_file.perpare_next_merge(); + if other_file.previous.len() > 1 { + // The other node is a merge node, we should merge the merge nodes directly: + file.previous.extend(other_file.previous.iter().cloned()); + } else { + // The other node is a 'normal' node - we need to merge it normally + file.previous.insert(other_file.store(store).await?); + } if our_cid.hash().digest() > other_cid.hash().digest() { file.userland = other_file.userland.clone(); + file.metadata = other_file.metadata.clone(); } } (node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => { @@ -947,7 +954,7 @@ impl PublicDirectory { (PublicNode::Dir(dir), PublicNode::Dir(other_dir)) => { let mut path = current_path.to_vec(); path.push(name.clone()); - dir.merge_helper(other_dir, time, store, &path, file_tie_breaks) + dir.merge_helper(other_dir, store, &path, file_tie_breaks) .await?; } }