From 14d8eb4e754df5c192850acb4c752d324a5edfa7 Mon Sep 17 00:00:00 2001 From: Stephen Akinyemi Date: Thu, 14 Apr 2022 13:16:18 +0100 Subject: [PATCH] Implement unix fs ops --- crates/fs/common/error.rs | 1 + crates/fs/lib.rs | 15 ++ crates/fs/public/directory.rs | 268 +++++++++++++++++++++++++--------- crates/fs/public/link.rs | 73 +++++++-- 4 files changed, 279 insertions(+), 78 deletions(-) diff --git a/crates/fs/common/error.rs b/crates/fs/common/error.rs index a5a6b9cf..bf73613f 100644 --- a/crates/fs/common/error.rs +++ b/crates/fs/common/error.rs @@ -16,6 +16,7 @@ pub enum FsError { NotADirectory, NotFound, FileAlreadyExists, + DirectoryAlreadyExists, UndecodableCborData(String), } diff --git a/crates/fs/lib.rs b/crates/fs/lib.rs index 8243e87c..9e007086 100644 --- a/crates/fs/lib.rs +++ b/crates/fs/lib.rs @@ -2,6 +2,7 @@ mod common; pub mod public; pub use common::*; +pub use utils::*; //-------------------------------------------------------------------------------------------------- // Re-exports @@ -13,3 +14,17 @@ pub use libipld::{ codec::{Decode, Encode}, Cid, IpldCodec, }; + +//-------------------------------------------------------------------------------------------------- +// Utils +//-------------------------------------------------------------------------------------------------- + +mod utils { + use std::{cell::RefCell, rc::Rc}; + + pub type Shared = Rc>; + + pub fn shared(t: T) -> Shared { + Rc::new(RefCell::new(t)) + } +} diff --git a/crates/fs/public/directory.rs b/crates/fs/public/directory.rs index d28fa21e..e44965f4 100644 --- a/crates/fs/public/directory.rs +++ b/crates/fs/public/directory.rs @@ -1,15 +1,14 @@ //! Public fs directory node. use std::{ - cell::RefCell, cmp::Ordering, collections::BTreeMap, + future::Future, io::{Read, Seek}, - mem, rc::Rc, }; -use crate::{error, BlockStore, FsError, Metadata, UnixFsNodeKind}; +use crate::{error, shared, BlockStore, FsError, Metadata, Shared, UnixFsNodeKind}; use anyhow::Result; use async_recursion::async_recursion; use chrono::{DateTime, Utc}; @@ -20,7 +19,11 @@ use libipld::{ Cid, IpldCodec, }; -use super::{Link, PublicNode}; +use super::{Link, PublicFile, PublicNode}; + +//-------------------------------------------------------------------------------------------------- +// Type Definitions +//-------------------------------------------------------------------------------------------------- /// A directory in a WNFS public file system. #[derive(Debug, Clone, PartialEq, Eq, FieldNames)] @@ -30,6 +33,16 @@ pub struct PublicDirectory { pub(crate) previous: Option, } +/// A fork of a directory. +pub struct Fork { + pub forked_dir: Shared, + pub working_node: T, +} + +//-------------------------------------------------------------------------------------------------- +// Implementations +//-------------------------------------------------------------------------------------------------- + impl PublicDirectory { /// Creates a new directory using the given metadata. pub fn new(time: DateTime) -> Self { @@ -41,26 +54,26 @@ impl PublicDirectory { } /// Follows a path and fetches the node at the end of the path. + /// + /// If path is empty, this returns a cloned directory based on `self`. pub async fn get_node( &self, path_segments: &[String], store: &B, - ) -> Result>>> { - if path_segments.is_empty() { - return error(FsError::InvalidPath); - } - - // TODO(appcypher): Any way to avoid this clone? - let mut working_node = Some(Rc::new(RefCell::new(PublicNode::Dir(self.clone())))); + ) -> Result>>> { + // TODO(appcypher): This does not need to be deep cloned like `mkdir`. + // Set working node to current directory. + let forked_dir = shared(PublicNode::Dir(self.clone())); + let mut working_node = Some(Rc::clone(&forked_dir)); - // Iterate over the path segments until we get the node of the last segment. + // Iterate over the path segments. for (index, segment) in path_segments.iter().enumerate() { // Cast working node to directory. let node_rc = working_node.unwrap(); let node_ref = node_rc.borrow(); let dir = node_ref.as_dir(); - // Fetch node representing path segment in working directory. + // Fetch node representing the path segment in the working directory. if let Some(found_node) = dir.lookup_node(segment, store).await? { match *found_node.borrow() { // If the node is a directory, set it as the working node. @@ -82,10 +95,14 @@ impl PublicDirectory { } // If the node is not found, we return an none. - return Ok(None); + working_node = None; + break; } - Ok(working_node) + Ok(Fork { + forked_dir, + working_node, + }) } /// Looks up a node by its path name in the current directory. @@ -95,7 +112,7 @@ impl PublicDirectory { &self, path_segment: &str, store: &B, - ) -> Result>>> { + ) -> Result>> { Ok(match self.userland.get(path_segment) { Some(link) => Some(link.resolve(store).await?), None => None, @@ -167,9 +184,11 @@ impl PublicDirectory { path_segments: &[String], store: &mut B, ) -> Result { - let node = self.get_node(path_segments, store).await?; - match node { - Some(node_rc) => match &*node_rc.borrow() { + match self.get_node(path_segments, store).await? { + Fork { + working_node: Some(node_rc), + .. + } => match &*node_rc.borrow() { PublicNode::File(file) => Ok(file.userland), _ => error(FsError::NotAFile), }, @@ -177,9 +196,30 @@ impl PublicDirectory { } } - pub async fn upsert() -> Result { - // TODO(appcypher): Implement this. - todo!() + /// Updates or inserts a new node at the specified path. + /// + /// NOTE(appcypher): This is meant for internal use only as it mutates the directory in place for performance. + /// Ideally, this method should be called with a newly forked directory. + pub(super) async fn upsert( + &mut self, + path_segment: &str, + update_fn: impl FnOnce(Option) -> Fut, + ) -> Result<()> + where + Fut: Future>>, + { + match update_fn(self.userland.get(path_segment).cloned()).await? { + // If the link is none, we remove the node from the userland. + None => { + self.userland.remove(path_segment); + } + // If the link is some, we insert the node into the userland. + Some(link) => { + self.userland.insert(path_segment.to_string(), link); + } + } + + Ok(()) } /// Writes a file to the directory. @@ -188,53 +228,78 @@ impl PublicDirectory { pub async fn write( &self, path_segments: &[String], - time: DateTime, content_cid: Cid, + time: DateTime, store: &B, - ) -> Result { - // Get the path segments for the file's parent directory. - let parent_path = match path_segments.split_last() { - Some((_, rest)) => rest, + ) -> Result> { + // If it does not already exist, create the file's parent directory. + let ( + Fork { + forked_dir, + working_node: parent_directory, + }, + tail, + ) = match path_segments.split_last() { None => return error(FsError::InvalidPath), + Some((tail, parent_path_segments)) => { + (self.mkdir(parent_path_segments, time, store).await?, tail) + } }; - let mut directory = self.mkdir(parent_path, time, store).await?; + // Insert or create file in parent directory. + parent_directory + .borrow_mut() + .as_mut_dir() + .upsert(tail, move |link| async move { + // If a link is provided, it is a cue to update it. + if let Some(link) = link { + let node = link.resolve(store).await?; + return match &mut *node.borrow_mut() { + PublicNode::File(file) => { + file.metadata = Metadata::new(time, UnixFsNodeKind::File); + file.userland = content_cid; + Ok(Some(link)) + } + _ => error(FsError::DirectoryAlreadyExists), + }; + } - todo!() - } + // If nothing is provided, it is a cue to return a new file node. + let link = Link::with_file(PublicFile::new(time, content_cid)); + Ok(Some(link)) + }) + .await?; - /// Returns the children links of a directory. - pub async fn ls(&self) -> Result> { - // TODO(appcypher): Implement this. - todo!() + Ok(forked_dir) } - /// Creates a new directory with the given path. + /// Creates a new directory at the specified path. + /// + /// If path is empty, this returns a cloned directory based on `self`. + /// + /// This method acts like `mkdir -p` in Unix because it creates intermediate directories if they do not exist. pub async fn mkdir( &self, path_segments: &[String], time: DateTime, store: &B, - ) -> Result { - if path_segments.is_empty() { - return error(FsError::InvalidPath); - } - - // Clone the directory and create a new root. - let new_root = self.clone(); - let mut working_node = Rc::new(RefCell::new(PublicNode::Dir(new_root))); + ) -> Result>> { + // TODO(appcypher): Investigate that deep cloning is actually done. + // Clone the directory to prevent mutation of the original directory. + let forked_dir = shared(PublicNode::Dir(self.clone())); + let mut working_node = Rc::clone(&forked_dir); - // Iterate over the path segments until the last segment. + // Iterate over path segments. for (index, segment) in path_segments.iter().enumerate() { let mut _next_node = None; - // This block helps us reduce the lifetime scope of the mutable borrow of working_node. + // This block helps us shorten the lifetime scope of the mutable borrow of working_node below. { // Cast working node to directory. let mut node_mut = working_node.borrow_mut(); let dir = node_mut.as_mut_dir(); - // Fetch node representing path segment in working directory. + // Fetch node representing the path segment in the working directory. if let Some(found_node) = dir.lookup_node(segment, store).await? { match *found_node.borrow() { // If the node is a directory, set it as the next working node. @@ -252,14 +317,13 @@ impl PublicDirectory { } } else { // If the node is not found, we create it. - let new_node_rc = - Rc::new(RefCell::new(PublicNode::Dir(PublicDirectory::new(time)))); + let new_node_rc = shared(PublicNode::Dir(PublicDirectory::new(time))); // Insert the new node into the working directory. dir.userland .insert(segment.clone(), Link::Node(Rc::clone(&new_node_rc))); - // And set that as the new working node. + // And set it as the new working node. _next_node = Some(new_node_rc); } } @@ -267,13 +331,82 @@ impl PublicDirectory { working_node = _next_node.unwrap(); } - // Get the PublicNode behind the working_node `Rc`. - let node = mem::replace( - &mut *working_node.borrow_mut(), - PublicNode::Dir(PublicDirectory::new(time)), - ); + Ok(Fork { + forked_dir, + working_node, + }) + } - Ok(node.into_dir()) + /// Returns the name and metadata of the direct children of a directory. + pub async fn ls( + &self, + path_segments: &[String], + store: &B, + ) -> Result> { + let node = self + .get_node(path_segments, store) + .await? + .working_node + .ok_or(FsError::NotFound)?; + + let node = node.borrow(); + match &*node { + PublicNode::Dir(dir) => { + // Save the directory's children info in a vector. + let mut result = vec![]; + for (name, link) in dir.userland.iter() { + match &*link.resolve(store).await?.borrow() { + PublicNode::File(file) => { + result.push((name.clone(), file.metadata.clone())); + } + PublicNode::Dir(dir) => { + result.push((name.clone(), dir.metadata.clone())); + } + } + } + Ok(result) + } + _ => error(FsError::NotADirectory), + } + } + + /// Removes a file or directory from the directory. + pub async fn rm( + &self, + path_segments: &[String], + store: &mut B, + ) -> Result> { + // TODO(appcypher): This should do a deep clone after. + // Get node's parent directory. + let ( + Fork { + working_node: parent_node, + forked_dir, + }, + tail, + ) = match path_segments.split_last() { + None => return error(FsError::InvalidPath), + Some((tail, parent_path_segments)) => { + (self.get_node(parent_path_segments, store).await?, tail) + } + }; + + let parent_node = parent_node.ok_or(FsError::NotFound)?; + match &mut *parent_node.borrow_mut() { + PublicNode::Dir(dir) => { + // Remove the file from the parent directory if present. + dir.upsert(tail, |link| async move { + match link { + Some(_) => Ok(None), + _ => error(FsError::NotFound), + } + }) + .await?; + } + _ => return error(FsError::NotADirectory), + }; + + Ok(forked_dir) } } @@ -352,22 +485,21 @@ mod public_directory_tests { let time = Utc::now(); - // let root = root - // .write(&[String::from("text.txt")], time, content_cid, &mut store) - // .await - // .unwrap(); + let root = root + .write(&[String::from("text.txt")], content_cid, time, &mut store) + .await + .unwrap(); + + let root = root.borrow(); - // let node = root.lookup_node("text.txt", &store).await; + let node = root.as_dir().lookup_node("text.txt", &store).await.unwrap(); - // assert!(node.is_ok()); + assert!(node.is_some()); - // assert_eq!( - // node.unwrap(), - // Some(Rc::new(PublicNode::File(PublicFile::new( - // time, - // content_cid - // )))) - // ); + assert_eq!( + node.unwrap(), + shared(PublicNode::File(PublicFile::new(time, content_cid))) + ); } #[async_std::test] diff --git a/crates/fs/public/link.rs b/crates/fs/public/link.rs index 7f544dcc..bf9f18a4 100644 --- a/crates/fs/public/link.rs +++ b/crates/fs/public/link.rs @@ -1,12 +1,12 @@ //! Node link. -use std::{cell::RefCell, rc::Rc}; +use std::rc::Rc; use anyhow::Result; use libipld::{cbor::DagCborCodec, Cid}; -use super::PublicNode; -use crate::BlockStore; +use super::{PublicDirectory, PublicFile, PublicNode}; +use crate::{shared, BlockStore, Shared}; /// A link to another node in the WNFS public file system. It can be held as a simple serialised CID or as a reference to the node itself. /// @@ -14,22 +14,32 @@ use crate::BlockStore; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Link { Cid(Cid), - Node(Rc>), + Node(Shared), } impl Link { - // Resolves a CID link in the file system to a node. - pub async fn resolve(&self, store: &B) -> Result>> { + /// Creates a new directory node link. + pub fn with_dir(dir: PublicDirectory) -> Self { + Link::Node(shared(PublicNode::Dir(dir))) + } + + /// Creates a new file node link. + pub fn with_file(file: PublicFile) -> Self { + Link::Node(shared(PublicNode::File(file))) + } + + /// Resolves a CID linkin the file system to a node. + pub async fn resolve(&self, store: &B) -> Result> { Ok(match self { Link::Cid(cid) => { let node = store.load(cid, DagCborCodec).await?; - Rc::new(RefCell::new(node)) + shared(node) } Link::Node(node) => Rc::clone(node), }) } - // Stores the link in the block store and returns the CID. + /// Stores the link in the block store and returns the CID. pub async fn seal(&self, store: &mut B) -> Result { Ok(match self { Link::Cid(cid) => *cid, @@ -40,13 +50,56 @@ impl Link { #[cfg(test)] mod public_link_tests { + use std::mem; + + use chrono::Utc; + use libipld::Cid; + + use crate::{ + public::{PublicDirectory, PublicFile, PublicNode}, + shared, MemoryBlockStore, + }; + + use super::Link; + #[async_std::test] async fn node_link_sealed_successfully() { - // TODO(appcypher): Implement this. + let time = Utc::now(); + + let userland = Cid::default(); + + let file = PublicFile::new(time, userland); + + let mut store = MemoryBlockStore::default(); + + let file_cid = file.store(&mut store).await.unwrap(); + + let unsealed_link = Link::Node(shared(PublicNode::File(file))); + + let sealed_cid = unsealed_link.seal(&mut store).await.unwrap(); + + assert_eq!(file_cid, sealed_cid); } #[async_std::test] async fn cid_link_resolved_successfully() { - // TODO(appcypher): Implement this. + let time = Utc::now(); + + let dir = PublicDirectory::new(time); + + let mut store = MemoryBlockStore::default(); + + let dir_cid = dir.store(&mut store).await.unwrap(); + + let unresolved_link = Link::Cid(dir_cid); + + let resolved_node = unresolved_link.resolve(&store).await.unwrap(); + + let node = mem::replace( + &mut *resolved_node.borrow_mut(), + PublicNode::Dir(PublicDirectory::new(time)), + ); + + assert_eq!(dir, node.into_dir()) } }