Skip to content

Commit

Permalink
fix: Consistently merge metadata between nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Apr 17, 2024
1 parent d8e8c73 commit 4a499bf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 18 deletions.
21 changes: 20 additions & 1 deletion wnfs-common/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
use anyhow::{bail, Result};
use chrono::{DateTime, TimeZone, Utc};
use libipld::Ipld;
use libipld::{Ipld, Multihash};
use multihash::{Code, MultihashDigest};
use serde::{
de::{DeserializeOwned, Error as DeError},
Deserialize, Deserializer, Serialize, Serializer,
Expand Down Expand Up @@ -223,6 +224,24 @@ impl Metadata {
self.0.insert(key.clone(), value.clone());
}
}

pub(crate) fn hash(&self) -> Result<Multihash> {
let vec = serde_ipld_dagcbor::to_vec(self)?;
let hash = Code::Blake3_256.digest(&vec);
Ok(hash)
}

/// Tie break this node with another one.
/// Used for conflict reconciliation. We don't merge the two metadata maps
/// together (yet), instead we compare their hashes. The one with the lower hash
/// survives.
pub fn tie_break_with(&mut self, other: &Self) -> Result<()> {
if self.hash()?.digest() > other.hash()?.digest() {
self.0 = other.0.clone();
}

Ok(())
}
}

impl TryFrom<&Ipld> for NodeType {
Expand Down
41 changes: 24 additions & 17 deletions wnfs/src/public/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,6 @@ impl PublicDirectory {
pub async fn reconcile(
self: &mut Arc<Self>,
other: Arc<Self>,
time: DateTime<Utc>,
store: &impl BlockStore,
) -> Result<Reconciliation> {
let causal_order = self.clone().causal_compare(other.clone(), store).await?;
Expand All @@ -865,7 +864,7 @@ impl PublicDirectory {
Reconciliation::FastForward
}
None => {
let file_tie_breaks = self.merge(&other, time, store).await?;
let file_tie_breaks = self.merge(&other, store).await?;
Reconciliation::Merged { file_tie_breaks }
}
})
Expand All @@ -886,11 +885,10 @@ impl PublicDirectory {
pub async fn merge<'a>(
self: &'a mut Arc<Self>,
other: &'a Arc<Self>,
time: DateTime<Utc>,
store: &'a impl BlockStore,
) -> Result<BTreeSet<Vec<String>>> {
let mut file_tie_breaks = BTreeSet::new();
self.merge_helper(other, time, store, &[], &mut file_tie_breaks)
self.merge_helper(other, store, &[], &mut file_tie_breaks)
.await?;
Ok(file_tie_breaks)
}
Expand All @@ -900,13 +898,19 @@ impl PublicDirectory {
async fn merge_helper<'a>(
self: &'a mut Arc<Self>,
other: &'a Arc<Self>,
time: DateTime<Utc>,
store: &'a impl BlockStore,
current_path: &[String],
file_tie_breaks: &mut BTreeSet<Vec<String>>,
) -> Result<()> {
let dir = self.prepare_next_merge();
dir.metadata.upsert_mtime(time);
if other.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
dir.previous.extend(other.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
dir.previous.insert(other.store(store).await?);
}
dir.metadata.tie_break_with(&other.metadata)?;

for (name, link) in other.userland.iter() {
let other_node = link.resolve_value(store).await?;
Expand All @@ -915,25 +919,28 @@ impl PublicDirectory {
vacant.insert(PublicLink::new(other_node.clone()));
}
Entry::Occupied(mut occupied) => {
match (
occupied.get_mut().resolve_value_mut(store).await?,
other_node,
) {
let our_node = occupied.get_mut().resolve_value_mut(store).await?;
match (our_node, other_node) {
(PublicNode::File(our_file), PublicNode::File(other_file)) => {
let mut path = current_path.to_vec();
path.push(name.clone());
file_tie_breaks.insert(path);

let file = our_file.perpare_next_merge();
file.metadata.upsert_mtime(time);

let our_cid = file.store(store).await?;
let other_cid = other_file.store(store).await?;
let our_cid = our_file.userland.resolve_cid(store).await?;
let other_cid = other_file.userland.resolve_cid(store).await?;

file.previous.insert(other_cid);
let file = our_file.perpare_next_merge();
if other_file.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
file.previous.extend(other_file.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
file.previous.insert(other_file.store(store).await?);
}

if our_cid.hash().digest() > other_cid.hash().digest() {
file.userland = other_file.userland.clone();
file.metadata = other_file.metadata.clone();
}
}
(node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => {
Expand All @@ -947,7 +954,7 @@ impl PublicDirectory {
(PublicNode::Dir(dir), PublicNode::Dir(other_dir)) => {
let mut path = current_path.to_vec();
path.push(name.clone());
dir.merge_helper(other_dir, time, store, &path, file_tie_breaks)
dir.merge_helper(other_dir, store, &path, file_tie_breaks)
.await?;
}
}
Expand Down

0 comments on commit 4a499bf

Please sign in to comment.